Skip to content

Commit

Permalink
feat(kafka): add datastreams monitoring support for kafkajs batches (#…
Browse files Browse the repository at this point in the history
…4645)

* Add DSM support for KafkaJS batch consume operation
  • Loading branch information
wconti27 authored and juan-fernandez committed Sep 30, 2024
1 parent a429889 commit 5f80887
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 34 deletions.
98 changes: 67 additions & 31 deletions packages/datadog-instrumentations/src/kafkajs.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ const consumerCommitCh = channel('apm:kafkajs:consume:commit')
const consumerFinishCh = channel('apm:kafkajs:consume:finish')
const consumerErrorCh = channel('apm:kafkajs:consume:error')

const batchConsumerStartCh = channel('apm:kafkajs:consume-batch:start')
const batchConsumerFinishCh = channel('apm:kafkajs:consume-batch:finish')
const batchConsumerErrorCh = channel('apm:kafkajs:consume-batch:error')

function commitsFromEvent (event) {
const { payload: { groupId, topics } } = event
const commitList = []
Expand Down Expand Up @@ -96,50 +100,82 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf
return createConsumer.apply(this, arguments)
}

const eachMessageExtractor = (args) => {
const { topic, partition, message } = args[0]
return { topic, partition, message, groupId }
}

const eachBatchExtractor = (args) => {
const { batch } = args[0]
const { topic, partition, messages } = batch
return { topic, partition, messages, groupId }
}

const consumer = createConsumer.apply(this, arguments)

consumer.on(consumer.events.COMMIT_OFFSETS, commitsFromEvent)

const run = consumer.run

const groupId = arguments[0].groupId
consumer.run = function ({ eachMessage, ...runArgs }) {
if (typeof eachMessage !== 'function') return run({ eachMessage, ...runArgs })
consumer.run = function ({ eachMessage, eachBatch, ...runArgs }) {
eachMessage = wrapFunction(
eachMessage,
consumerStartCh,
consumerFinishCh,
consumerErrorCh,
eachMessageExtractor
)

eachBatch = wrapFunction(
eachBatch,
batchConsumerStartCh,
batchConsumerFinishCh,
batchConsumerErrorCh,
eachBatchExtractor
)

return run({
eachMessage: function (...eachMessageArgs) {
const innerAsyncResource = new AsyncResource('bound-anonymous-fn')
return innerAsyncResource.runInAsyncScope(() => {
const { topic, partition, message } = eachMessageArgs[0]
consumerStartCh.publish({ topic, partition, message, groupId })
try {
const result = eachMessage.apply(this, eachMessageArgs)
if (result && typeof result.then === 'function') {
result.then(
innerAsyncResource.bind(() => consumerFinishCh.publish(undefined)),
innerAsyncResource.bind(err => {
if (err) {
consumerErrorCh.publish(err)
}
consumerFinishCh.publish(undefined)
})
)
} else {
consumerFinishCh.publish(undefined)
}

return result
} catch (e) {
consumerErrorCh.publish(e)
consumerFinishCh.publish(undefined)
throw e
}
})
},
eachMessage,
eachBatch,
...runArgs
})
}

return consumer
})
return Kafka
})

const wrapFunction = (fn, startCh, finishCh, errorCh, extractArgs) => {
return typeof fn === 'function'
? function (...args) {
const innerAsyncResource = new AsyncResource('bound-anonymous-fn')
return innerAsyncResource.runInAsyncScope(() => {
const extractedArgs = extractArgs(args)
startCh.publish(extractedArgs)
try {
const result = fn.apply(this, args)
if (result && typeof result.then === 'function') {
result.then(
innerAsyncResource.bind(() => finishCh.publish(undefined)),
innerAsyncResource.bind(err => {
if (err) {
errorCh.publish(err)
}
finishCh.publish(undefined)
})
)
} else {
finishCh.publish(undefined)
}
return result
} catch (e) {
errorCh.publish(e)
finishCh.publish(undefined)
throw e
}
})
}
: fn
}
21 changes: 21 additions & 0 deletions packages/datadog-plugin-kafkajs/src/batch-consumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')
const { getMessageSize } = require('../../dd-trace/src/datastreams/processor')
const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway')

class KafkajsBatchConsumerPlugin extends ConsumerPlugin {
static get id () { return 'kafkajs' }
static get operation () { return 'consume-batch' }

start ({ topic, partition, messages, groupId }) {
if (!this.config.dsmEnabled) return
for (const message of messages) {
if (!message || !message.headers || !DsmPathwayCodec.contextExists(message.headers)) continue
const payloadSize = getMessageSize(message)
this.tracer.decodeDataStreamsContext(message.headers)
this.tracer
.setCheckpoint(['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka'], null, payloadSize)
}
}
}

module.exports = KafkajsBatchConsumerPlugin
4 changes: 3 additions & 1 deletion packages/datadog-plugin-kafkajs/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@

const ProducerPlugin = require('./producer')
const ConsumerPlugin = require('./consumer')
const BatchConsumerPlugin = require('./batch-consumer')
const CompositePlugin = require('../../dd-trace/src/plugins/composite')

class KafkajsPlugin extends CompositePlugin {
static get id () { return 'kafkajs' }
static get plugins () {
return {
producer: ProducerPlugin,
consumer: ConsumerPlugin
consumer: ConsumerPlugin,
batchConsumer: BatchConsumerPlugin
}
}
}
Expand Down
18 changes: 16 additions & 2 deletions packages/datadog-plugin-kafkajs/test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ describe('Plugin', () => {
)
})

describe('consumer', () => {
describe('consumer (eachMessage)', () => {
let consumer

beforeEach(async () => {
Expand Down Expand Up @@ -387,7 +387,7 @@ describe('Plugin', () => {
expect(setDataStreamsContextSpy.args[0][0].hash).to.equal(expectedProducerHash)
})

it('Should set a checkpoint on consume', async () => {
it('Should set a checkpoint on consume (eachMessage)', async () => {
const runArgs = []
await consumer.run({
eachMessage: async () => {
Expand All @@ -401,6 +401,20 @@ describe('Plugin', () => {
}
})

it('Should set a checkpoint on consume (eachBatch)', async () => {
const runArgs = []
await consumer.run({
eachBatch: async () => {
runArgs.push(setDataStreamsContextSpy.lastCall.args[0])
}
})
await sendMessages(kafka, testTopic, messages)
await consumer.disconnect()
for (const runArg of runArgs) {
expect(runArg.hash).to.equal(expectedConsumerHash)
}
})

it('Should set a message payload size when producing a message', async () => {
const messages = [{ key: 'key1', value: 'test2' }]
if (DataStreamsProcessor.prototype.recordCheckpoint.isSinonProxy) {
Expand Down

0 comments on commit 5f80887

Please sign in to comment.