Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kafka): add datastreams monitoring support for kafkajs batches #4645

Merged
merged 11 commits into from
Sep 3, 2024
95 changes: 66 additions & 29 deletions packages/datadog-instrumentations/src/kafkajs.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ const consumerCommitCh = channel('apm:kafkajs:consume:commit')
const consumerFinishCh = channel('apm:kafkajs:consume:finish')
const consumerErrorCh = channel('apm:kafkajs:consume:error')

const batchConsumerStartCh = channel('apm:kafkajs:consume-batch:start')
const batchConsumerFinishCh = channel('apm:kafkajs:consume-batch:finish')
const batchConsumerErrorCh = channel('apm:kafkajs:consume-batch:error')

function commitsFromEvent (event) {
const { payload: { groupId, topics } } = event
const commitList = []
Expand Down Expand Up @@ -103,39 +107,72 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf
const run = consumer.run

const groupId = arguments[0].groupId
consumer.run = function ({ eachMessage, ...runArgs }) {
if (typeof eachMessage !== 'function') return run({ eachMessage, ...runArgs })

consumer.run = function ({ eachMessage, eachBatch, ...runArgs }) {
return run({
eachMessage: function (...eachMessageArgs) {
const innerAsyncResource = new AsyncResource('bound-anonymous-fn')
return innerAsyncResource.runInAsyncScope(() => {
const { topic, partition, message } = eachMessageArgs[0]
consumerStartCh.publish({ topic, partition, message, groupId })
try {
const result = eachMessage.apply(this, eachMessageArgs)
if (result && typeof result.then === 'function') {
result.then(
innerAsyncResource.bind(() => consumerFinishCh.publish(undefined)),
innerAsyncResource.bind(err => {
if (err) {
consumerErrorCh.publish(err)
}
consumerFinishCh.publish(undefined)
})
)
} else {
eachMessage: typeof eachMessage === 'function'
? function (...eachMessageArgs) {
const innerAsyncResource = new AsyncResource('bound-anonymous-fn')
return innerAsyncResource.runInAsyncScope(() => {
const { topic, partition, message } = eachMessageArgs[0]
consumerStartCh.publish({ topic, partition, message, groupId })
try {
const result = eachMessage.apply(this, eachMessageArgs)
if (result && typeof result.then === 'function') {
result.then(
innerAsyncResource.bind(() => consumerFinishCh.publish(undefined)),
innerAsyncResource.bind(err => {
if (err) {
consumerErrorCh.publish(err)
}
consumerFinishCh.publish(undefined)
})
)
} else {
consumerFinishCh.publish(undefined)
}

return result
} catch (e) {
consumerErrorCh.publish(e)
consumerFinishCh.publish(undefined)
throw e
}

return result
} catch (e) {
consumerErrorCh.publish(e)
consumerFinishCh.publish(undefined)
throw e
})
}
: eachMessage,
eachBatch:
typeof eachBatch === 'function'
? function (...eachBatchArgs) {
wconti27 marked this conversation as resolved.
Show resolved Hide resolved
const innerAsyncResource = new AsyncResource('bound-anonymous-fn')
return innerAsyncResource.runInAsyncScope(() => {
const { batch } = eachBatchArgs[0]
const { topic, partition, messages } = batch
batchConsumerStartCh.publish({ topic, partition, messages, groupId })
try {
const result = eachBatch.apply(this, eachBatchArgs)
if (result && typeof result.then === 'function') {
result.then(
innerAsyncResource.bind(() => batchConsumerFinishCh.publish()),
innerAsyncResource.bind((err) => {
if (err) {
batchConsumerErrorCh.publish(err)
}
batchConsumerFinishCh.publish()
})
)
} else {
batchConsumerFinishCh.publish()
}

return result
} catch (error) {
batchConsumerErrorCh.publish(error)
batchConsumerFinishCh.publish()
throw error
}
})
}
})
},
: eachBatch,
...runArgs
})
}
Expand Down
23 changes: 23 additions & 0 deletions packages/datadog-plugin-kafkajs/src/batch-consumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')
const { getMessageSize } = require('../../dd-trace/src/datastreams/processor')
const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway')

class KafkajsBatchConsumerPlugin extends ConsumerPlugin {
static get id () { return 'kafkajs' }
static get operation () { return 'consume-batch' }

start ({ topic, partition, messages, groupId }) {
if (this.config.dsmEnabled) {
for (const message of messages) {
if (message?.headers && DsmPathwayCodec.contextExists(message.headers)) {
const payloadSize = getMessageSize(message)
this.tracer.decodeDataStreamsContext(message.headers)
this.tracer
.setCheckpoint(['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka'], null, payloadSize)
}
}
}
}
wconti27 marked this conversation as resolved.
Show resolved Hide resolved
}

module.exports = KafkajsBatchConsumerPlugin
4 changes: 3 additions & 1 deletion packages/datadog-plugin-kafkajs/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@

const ProducerPlugin = require('./producer')
const ConsumerPlugin = require('./consumer')
const BatchConsumerPlugin = require('./batch-consumer')
const CompositePlugin = require('../../dd-trace/src/plugins/composite')

class KafkajsPlugin extends CompositePlugin {
static get id () { return 'kafkajs' }
static get plugins () {
return {
producer: ProducerPlugin,
consumer: ConsumerPlugin
consumer: ConsumerPlugin,
batchConsumer: BatchConsumerPlugin
}
}
}
Expand Down
18 changes: 16 additions & 2 deletions packages/datadog-plugin-kafkajs/test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ describe('Plugin', () => {
)
})

describe('consumer', () => {
describe('consumer (eachMessage)', () => {
let consumer

beforeEach(async () => {
Expand Down Expand Up @@ -387,7 +387,7 @@ describe('Plugin', () => {
expect(setDataStreamsContextSpy.args[0][0].hash).to.equal(expectedProducerHash)
})

it('Should set a checkpoint on consume', async () => {
it('Should set a checkpoint on consume (eachMessage)', async () => {
const runArgs = []
await consumer.run({
eachMessage: async () => {
Expand All @@ -401,6 +401,20 @@ describe('Plugin', () => {
}
})

it('Should set a checkpoint on consume (eachBatch)', async () => {
const runArgs = []
await consumer.run({
eachBatch: async () => {
runArgs.push(setDataStreamsContextSpy.lastCall.args[0])
}
})
await sendMessages(kafka, testTopic, messages)
await consumer.disconnect()
for (const runArg of runArgs) {
expect(runArg.hash).to.equal(expectedConsumerHash)
}
})

it('Should set a message payload size when producing a message', async () => {
const messages = [{ key: 'key1', value: 'test2' }]
if (DataStreamsProcessor.prototype.recordCheckpoint.isSinonProxy) {
Expand Down
Loading