Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: create trace span for kafkajs eachBatch #3400

130 changes: 87 additions & 43 deletions packages/datadog-instrumentations/src/kafkajs.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ const consumerStartCh = channel('apm:kafkajs:consume:start')
const consumerFinishCh = channel('apm:kafkajs:consume:finish')
const consumerErrorCh = channel('apm:kafkajs:consume:error')

const batchConsumerStartCh = channel('apm:kafkajs:consume-batch:start')
const batchConsumerFinishCh = channel('apm:kafkajs:consume-batch:finish')
const batchConsumerErrorCh = channel('apm:kafkajs:consume-batch:error')

addHook({ name: 'kafkajs', versions: ['>=1.4'] }, (obj) => {
class Kafka extends obj.Kafka {
constructor (options) {
Expand Down Expand Up @@ -70,52 +74,92 @@ addHook({ name: 'kafkajs', versions: ['>=1.4'] }, (obj) => {
return producer
})

shimmer.wrap(Kafka.prototype, 'consumer', createConsumer => function () {
if (!consumerStartCh.hasSubscribers) {
return createConsumer.apply(this, arguments)
}
shimmer.wrap(
Kafka.prototype,
'consumer',
(createConsumer) =>
function () {
if (!consumerStartCh.hasSubscribers) {
return createConsumer.apply(this, arguments)
}

const consumer = createConsumer.apply(this, arguments)
const run = consumer.run

const groupId = arguments[0].groupId
consumer.run = function ({ eachMessage, ...runArgs }) {
if (typeof eachMessage !== 'function') return run({ eachMessage, ...runArgs })

return run({
eachMessage: function (...eachMessageArgs) {
const innerAsyncResource = new AsyncResource('bound-anonymous-fn')
return innerAsyncResource.runInAsyncScope(() => {
const { topic, partition, message } = eachMessageArgs[0]
consumerStartCh.publish({ topic, partition, message, groupId })
try {
const result = eachMessage.apply(this, eachMessageArgs)
if (result && typeof result.then === 'function') {
result.then(
innerAsyncResource.bind(() => consumerFinishCh.publish(undefined)),
innerAsyncResource.bind(err => {
if (err) {
consumerErrorCh.publish(err)
const consumer = createConsumer.apply(this, arguments)
const run = consumer.run

const groupId = arguments[0].groupId
consumer.run = function ({ eachMessage, eachBatch, ...runArgs }) {
return run({
eachMessage:
typeof eachMessage === 'function'
? function (...eachMessageArgs) {
const innerAsyncResource = new AsyncResource('bound-anonymous-fn')
return innerAsyncResource.runInAsyncScope(() => {
const { topic, partition, message } = eachMessageArgs[0]
consumerStartCh.publish({ topic, partition, message, groupId })

try {
const result = eachMessage.apply(this, eachMessageArgs)
if (result && typeof result.then === 'function') {
result.then(
innerAsyncResource.bind(() => consumerFinishCh.publish()),
innerAsyncResource.bind((err) => {
if (err) {
consumerErrorCh.publish(err)
}
consumerFinishCh.publish()
})
)
} else {
consumerFinishCh.publish()
}

return result
} catch (error) {
consumerErrorCh.publish(error)
consumerFinishCh.publish()
throw error
}
consumerFinishCh.publish(undefined)
})
)
} else {
consumerFinishCh.publish(undefined)
}

return result
} catch (e) {
consumerErrorCh.publish(e)
consumerFinishCh.publish(undefined)
throw e
}
}
: eachMessage,
eachBatch:
typeof eachBatch === 'function'
? function (...eachBatchArgs) {
const innerAsyncResource = new AsyncResource('bound-anonymous-fn')
return innerAsyncResource.runInAsyncScope(() => {
const { batch } = eachBatchArgs[0]
const { topic, partition, messages } = batch
batchConsumerStartCh.publish({ topic, partition, messages, groupId })
try {
const result = eachBatch.apply(this, eachBatchArgs)
if (result && typeof result.then === 'function') {
result.then(
innerAsyncResource.bind(() => batchConsumerFinishCh.publish()),
innerAsyncResource.bind((err) => {
if (err) {
batchConsumerErrorCh.publish(err)
}
batchConsumerFinishCh.publish()
})
)
} else {
batchConsumerFinishCh.publish()
}

return result
} catch (error) {
batchConsumerErrorCh.publish(error)
batchConsumerFinishCh.publish()
throw error
}
})
}
: eachBatch,
...runArgs
})
},
...runArgs
})
}
return consumer
})
}
return consumer
}
)
return obj
})
56 changes: 56 additions & 0 deletions packages/datadog-plugin-kafkajs/src/batch-consumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
'use strict'
const ConsumerPlugin = require('dd-trace/packages/dd-trace/src/plugins/consumer')

const kafkaMessageStub = { headers: [] }

class KafkajsBatchConsumerPlugin extends ConsumerPlugin {
static get id () {
return 'kafkajs'
}
static get operation () {
return 'consume-batch'
}

start ({ topic, partition, messages, groupId }) {
const message = process.env.DD_EXPERIMENTAL_KAFKAJS_PLUGIN_TRACE_FIRST_BATCH_MESSAGE
? messages[0]
: kafkaMessageStub

if (this.config.dsmEnabled) {
this.tracer.decodeDataStreamsContext(message.headers['dd-pathway-ctx'])
this.tracer.setCheckpoint(['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka'])
}
const childOf = extract(this.tracer, message.headers)
this.startSpan({
childOf,
resource: topic,
type: 'worker',
meta: {
component: 'kafkajs',
'kafka.topic': topic,
'kafka.message.offset': messages[0].offset,
'kafka.message.offsets': messages.map((m) => m.offset)
},
metrics: {
'kafka.partition': partition,
'kafka.batch_size': messages.length
}
})
}
}

function extract (tracer, bufferMap) {
if (!bufferMap) return null

const textMap = {}

for (const key of Object.keys(bufferMap)) {
if (bufferMap[key] === null || bufferMap[key] === undefined) continue

textMap[key] = bufferMap[key].toString()
}

return tracer.extract('text_map', textMap)
}

module.exports = KafkajsBatchConsumerPlugin
4 changes: 3 additions & 1 deletion packages/datadog-plugin-kafkajs/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@

const ProducerPlugin = require('./producer')
const ConsumerPlugin = require('./consumer')
const BatchConsumerPlugin = require('./batch-consumer')
const CompositePlugin = require('../../dd-trace/src/plugins/composite')

class KafkajsPlugin extends CompositePlugin {
static get id () { return 'kafkajs' }
static get plugins () {
return {
producer: ProducerPlugin,
consumer: ConsumerPlugin
consumer: ConsumerPlugin,
batchConsumer: BatchConsumerPlugin
}
}
}
Expand Down
Loading