Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
wconti27 committed Aug 30, 2024
1 parent 29195ed commit 95cdc1e
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 231 deletions.
49 changes: 1 addition & 48 deletions packages/datadog-plugin-kafkajs/src/batch-consumer.js
Original file line number Diff line number Diff line change
@@ -1,70 +1,23 @@
const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')
const { extract } = require('./utils')
const { getMessageSize } = require('../../dd-trace/src/datastreams/processor')
const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway')
const { isTrue } = require('../../dd-trace/src/util')
const coalesce = require('koalas')

class KafkajsBatchConsumerPlugin extends ConsumerPlugin {
static get id () { return 'kafkajs' }
static get operation () { return 'consume-batch' }

configure (config) {
super.configure(coalesceConfiguration(config, this.serviceIdentifier))
}

start ({ topic, partition, messages, groupId }) {
let childOf
if (this.config.batchedParentPropagationEnabled) {
for (const message of messages) {
// find the first valid context and use this as this span's parent
childOf = extract(this.tracer, message?.headers)
if (childOf._traceId !== null) {
break
}
}
}

const span = this.startSpan({
childOf,
resource: topic,
type: 'worker',
meta: {
component: 'kafkajs',
'kafka.topic': topic,
'kafka.message.offset': messages[0].offset,
'kafka.message.offset.last': messages[messages.length - 1].offset
},
metrics: {
'kafka.partition': partition,
'kafka.batch_size': messages.length
}
})

if (this.config.dsmEnabled) {
for (const message of messages) {
if (message?.headers && DsmPathwayCodec.contextExists(message.headers)) {
const payloadSize = getMessageSize(message)
this.tracer.decodeDataStreamsContext(message.headers)
this.tracer
.setCheckpoint(['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka'], span, payloadSize)
.setCheckpoint(['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka'], null, payloadSize)
}
}
}
}
}

function coalesceConfiguration (config) {
// check if batch propagation is enabled via env variable
config.batchedParentPropagationEnabled = isTrue(
coalesce(
process.env.DD_TRACE_KAFKAJS_BATCHED_PARENT_PROPAGATION_ENABLED,
config.batchedParentPropagationEnabled,
false
)
)

return config
}

module.exports = KafkajsBatchConsumerPlugin
35 changes: 14 additions & 21 deletions packages/datadog-plugin-kafkajs/src/consumer.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
'use strict'

const dc = require('dc-polyfill')
const { extract } = require('./utils')
const { getMessageSize } = require('../../dd-trace/src/datastreams/processor')
const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway')
const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')
Expand Down Expand Up @@ -91,26 +90,6 @@ class KafkajsConsumerPlugin extends ConsumerPlugin {
}
}

// startBatch ({ topic, partition, messages, groupId }) {
// if (this.config.dsmEnabled) {
// this.tracer.setCheckpoint(['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka'])
// }
// this.startSpan({
// resource: topic,
// type: 'worker',
// meta: {
// component: 'kafkajs',
// 'kafka.topic': topic,
// 'kafka.message.offset': messages[0].offset,
// 'kafka.message.offset.last': messages[messages.length - 1].offset
// },
// metrics: {
// 'kafka.partition': partition,
// 'kafka.batch_size': messages.length
// }
// })
// }

finish () {
if (beforeFinishCh.hasSubscribers) {
beforeFinishCh.publish()
Expand All @@ -120,4 +99,18 @@ class KafkajsConsumerPlugin extends ConsumerPlugin {
}
}

function extract (tracer, bufferMap) {
if (!bufferMap) return null

const textMap = {}

for (const key of Object.keys(bufferMap)) {
if (bufferMap[key] === null || bufferMap[key] === undefined) continue

textMap[key] = bufferMap[key].toString()
}

return tracer.extract('text_map', textMap)
}

module.exports = KafkajsConsumerPlugin
162 changes: 0 additions & 162 deletions packages/datadog-plugin-kafkajs/test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -342,117 +342,6 @@ describe('Plugin', () => {
)
})

describe('consumer (eachBatch)', () => {
let consumer

beforeEach(async () => {
consumer = kafka.consumer({ groupId: 'test-group' })
await consumer.subscribe({ topic: testTopic })
await consumer.connect()
})

afterEach(async () => {
await consumer.disconnect()
})

it('should be instrumented', async () => {
const expectedSpanPromise = expectSpanWithDefaults({
name: expectedSchema.receive.opName,
service: expectedSchema.receive.serviceName,
meta: {
'span.kind': 'consumer',
component: 'kafkajs'
},
resource: testTopic,
error: 0,
type: 'worker'
})

await consumer.run({
eachBatch: () => {}
})
await sendMessages(kafka, testTopic, messages)

return expectedSpanPromise
})

it('should run the consumer in the context of the consumer span', done => {
const firstSpan = tracer.scope().active()
let eachBatch = async ({ batch }) => {
const currentSpan = tracer.scope().active()

try {
expect(currentSpan).to.not.equal(firstSpan)
expect(currentSpan.context()._name).to.equal(expectedSchema.receive.opName)
done()
} catch (e) {
done(e)
} finally {
eachBatch = () => {} // avoid being called for each message
}
}

consumer.run({ eachBatch: (...args) => eachBatch(...args) })
.then(() => sendMessages(kafka, testTopic, messages))
.catch(done)
})

it('should not propagate context by default', async () => {
const expectedSpanPromise = agent.use(traces => {
const span = traces[0][0]

expect(span).to.include({
name: 'kafka.consume',
service: 'test-kafka',
resource: testTopic
})
expect(span.metrics['kafka.batch_size']).to.be.at.least(1)

expect(parseInt(span.parent_id.toString())).to.equal(0)
})

await consumer.run({ eachBatch: () => {} })
await sendMessages(kafka, testTopic, messages)
await expectedSpanPromise
})

it('should be instrumented w/ error', async () => {
const fakeError = new Error('Oh No!')
const expectedSpanPromise = expectSpanWithDefaults({
name: expectedSchema.receive.opName,
service: expectedSchema.receive.serviceName,
meta: {
[ERROR_TYPE]: fakeError.name,
[ERROR_MESSAGE]: fakeError.message,
[ERROR_STACK]: fakeError.stack,
component: 'kafkajs'
},
resource: testTopic,
error: 1,
type: 'worker'

})

await consumer.subscribe({ topic: testTopic, fromBeginning: true })
await consumer.run({
eachBatch: async ({ batch }) => {
throw fakeError
}
})
await sendMessages(kafka, testTopic, messages)

return expectedSpanPromise
})

withNamingSchema(
async () => {
await consumer.run({ eachBatch: () => {} })
await sendMessages(kafka, testTopic, messages)
},
rawExpectedSchema.receive
)
})

describe('data stream monitoring', () => {
let consumer

Expand Down Expand Up @@ -617,57 +506,6 @@ describe('Plugin', () => {
})
})
})

describe('with configuration', () => {
const messages = [{ key: 'key1', value: 'test2' }]

beforeEach(async () => {
tracer = require('../../dd-trace').init({
kafkajs: { batchedParentPropagationEnabled: true }
})
await agent.load('kafkajs', { batchedParentPropagationEnabled: true })
const lib = require(`../../../versions/kafkajs@${version}`).get()
Kafka = lib.Kafka
kafka = new Kafka({
clientId: `kafkajs-test-${version}`,
brokers: ['127.0.0.1:9092'],
logLevel: lib.logLevel.WARN
})
})

describe('consumer (eachBatch)', () => {
let consumer

beforeEach(async () => {
consumer = kafka.consumer({ groupId: 'test-group' })
await consumer.subscribe({ topic: testTopic })
await consumer.connect()
})

afterEach(async () => {
await consumer.disconnect()
})

it('should propagate context when configured', async () => {
const expectedSpanPromise = agent.use(traces => {
const span = traces[0][0]

expect(span).to.include({
name: 'kafka.consume',
service: 'test-kafka',
resource: testTopic
})
expect(span.metrics['kafka.batch_size']).to.be.at.least(1)

expect(parseInt(span.parent_id.toString())).to.be.gt(0)
})

await consumer.run({ eachBatch: () => {} })
await sendMessages(kafka, testTopic, messages)
await expectedSpanPromise
})
})
})
})
})
})
Expand Down

0 comments on commit 95cdc1e

Please sign in to comment.