Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kafka): add datastreams monitoring support for kafkajs batches #4645

Merged
merged 11 commits into from
Sep 3, 2024
98 changes: 67 additions & 31 deletions packages/datadog-instrumentations/src/kafkajs.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ const consumerCommitCh = channel('apm:kafkajs:consume:commit')
const consumerFinishCh = channel('apm:kafkajs:consume:finish')
const consumerErrorCh = channel('apm:kafkajs:consume:error')

const batchConsumerStartCh = channel('apm:kafkajs:consume-batch:start')
const batchConsumerFinishCh = channel('apm:kafkajs:consume-batch:finish')
const batchConsumerErrorCh = channel('apm:kafkajs:consume-batch:error')

function commitsFromEvent (event) {
const { payload: { groupId, topics } } = event
const commitList = []
Expand Down Expand Up @@ -96,50 +100,82 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf
return createConsumer.apply(this, arguments)
}

const eachMessageExtractor = (args) => {
const { topic, partition, message } = args[0]
return { topic, partition, message, groupId }
}

const eachBatchExtractor = (args) => {
const { batch } = args[0]
const { topic, partition, messages } = batch
return { topic, partition, messages, groupId }
}

const consumer = createConsumer.apply(this, arguments)

consumer.on(consumer.events.COMMIT_OFFSETS, commitsFromEvent)

const run = consumer.run

const groupId = arguments[0].groupId
consumer.run = function ({ eachMessage, ...runArgs }) {
if (typeof eachMessage !== 'function') return run({ eachMessage, ...runArgs })
consumer.run = function ({ eachMessage, eachBatch, ...runArgs }) {
eachMessage = wrapFunction(
eachMessage,
consumerStartCh,
consumerFinishCh,
consumerErrorCh,
eachMessageExtractor
)

eachBatch = wrapFunction(
eachBatch,
batchConsumerStartCh,
batchConsumerFinishCh,
batchConsumerErrorCh,
eachBatchExtractor
)

return run({
eachMessage: function (...eachMessageArgs) {
const innerAsyncResource = new AsyncResource('bound-anonymous-fn')
return innerAsyncResource.runInAsyncScope(() => {
const { topic, partition, message } = eachMessageArgs[0]
consumerStartCh.publish({ topic, partition, message, groupId })
try {
const result = eachMessage.apply(this, eachMessageArgs)
if (result && typeof result.then === 'function') {
result.then(
innerAsyncResource.bind(() => consumerFinishCh.publish(undefined)),
innerAsyncResource.bind(err => {
if (err) {
consumerErrorCh.publish(err)
}
consumerFinishCh.publish(undefined)
})
)
} else {
consumerFinishCh.publish(undefined)
}

return result
} catch (e) {
consumerErrorCh.publish(e)
consumerFinishCh.publish(undefined)
throw e
}
})
},
eachMessage,
eachBatch,
...runArgs
})
}

return consumer
})
return Kafka
})

const wrapFunction = (fn, startCh, finishCh, errorCh, extractArgs) => {
return typeof fn === 'function'
? function (...args) {
const innerAsyncResource = new AsyncResource('bound-anonymous-fn')
return innerAsyncResource.runInAsyncScope(() => {
const extractedArgs = extractArgs(args)
startCh.publish(extractedArgs)
try {
const result = fn.apply(this, args)
if (result && typeof result.then === 'function') {
result.then(
innerAsyncResource.bind(() => finishCh.publish(undefined)),
innerAsyncResource.bind(err => {
if (err) {
errorCh.publish(err)
}
finishCh.publish(undefined)
})
)
} else {
finishCh.publish(undefined)
}
return result
} catch (e) {
errorCh.publish(e)
finishCh.publish(undefined)
throw e
}
})
}
: fn
}
21 changes: 21 additions & 0 deletions packages/datadog-plugin-kafkajs/src/batch-consumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')
const { getMessageSize } = require('../../dd-trace/src/datastreams/processor')
const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway')

class KafkajsBatchConsumerPlugin extends ConsumerPlugin {
static get id () { return 'kafkajs' }
static get operation () { return 'consume-batch' }

start ({ topic, partition, messages, groupId }) {
if (!this.config.dsmEnabled) return
for (const message of messages) {
if (!message || !message.headers || !DsmPathwayCodec.contextExists(message.headers)) continue
const payloadSize = getMessageSize(message)
this.tracer.decodeDataStreamsContext(message.headers)
this.tracer
.setCheckpoint(['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka'], null, payloadSize)
}
}
}

module.exports = KafkajsBatchConsumerPlugin
4 changes: 3 additions & 1 deletion packages/datadog-plugin-kafkajs/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@

const ProducerPlugin = require('./producer')
const ConsumerPlugin = require('./consumer')
const BatchConsumerPlugin = require('./batch-consumer')
const CompositePlugin = require('../../dd-trace/src/plugins/composite')

class KafkajsPlugin extends CompositePlugin {
static get id () { return 'kafkajs' }
static get plugins () {
return {
producer: ProducerPlugin,
consumer: ConsumerPlugin
consumer: ConsumerPlugin,
batchConsumer: BatchConsumerPlugin
}
}
}
Expand Down
18 changes: 16 additions & 2 deletions packages/datadog-plugin-kafkajs/test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ describe('Plugin', () => {
)
})

describe('consumer', () => {
describe('consumer (eachMessage)', () => {
let consumer

beforeEach(async () => {
Expand Down Expand Up @@ -387,7 +387,7 @@ describe('Plugin', () => {
expect(setDataStreamsContextSpy.args[0][0].hash).to.equal(expectedProducerHash)
})

it('Should set a checkpoint on consume', async () => {
it('Should set a checkpoint on consume (eachMessage)', async () => {
const runArgs = []
await consumer.run({
eachMessage: async () => {
Expand All @@ -401,6 +401,20 @@ describe('Plugin', () => {
}
})

it('Should set a checkpoint on consume (eachBatch)', async () => {
const runArgs = []
await consumer.run({
eachBatch: async () => {
runArgs.push(setDataStreamsContextSpy.lastCall.args[0])
}
})
await sendMessages(kafka, testTopic, messages)
await consumer.disconnect()
for (const runArg of runArgs) {
expect(runArg.hash).to.equal(expectedConsumerHash)
}
})

it('Should set a message payload size when producing a message', async () => {
const messages = [{ key: 'key1', value: 'test2' }]
if (DataStreamsProcessor.prototype.recordCheckpoint.isSinonProxy) {
Expand Down
Loading