Skip to content

Commit

Permalink
update according to reviewer comments
Browse files Browse the repository at this point in the history
  • Loading branch information
wconti27 committed Sep 3, 2024
1 parent 7e34f1f commit 87c414b
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 73 deletions.
127 changes: 63 additions & 64 deletions packages/datadog-instrumentations/src/kafkajs.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,17 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf
return createConsumer.apply(this, arguments)
}

const eachMessageExtractor = (args) => {
const { topic, partition, message } = args[0]
return { topic, partition, message, groupId }
}

const eachBatchExtractor = (args) => {
const { batch } = args[0]
const { topic, partition, messages } = batch
return { topic, partition, messages, groupId }
}

const consumer = createConsumer.apply(this, arguments)

consumer.on(consumer.events.COMMIT_OFFSETS, commitsFromEvent)
Expand All @@ -108,75 +119,63 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf

const groupId = arguments[0].groupId
consumer.run = function ({ eachMessage, eachBatch, ...runArgs }) {
return run({
eachMessage: typeof eachMessage === 'function'
? function (...eachMessageArgs) {
const innerAsyncResource = new AsyncResource('bound-anonymous-fn')
return innerAsyncResource.runInAsyncScope(() => {
const { topic, partition, message } = eachMessageArgs[0]
consumerStartCh.publish({ topic, partition, message, groupId })
try {
const result = eachMessage.apply(this, eachMessageArgs)
if (result && typeof result.then === 'function') {
result.then(
innerAsyncResource.bind(() => consumerFinishCh.publish(undefined)),
innerAsyncResource.bind(err => {
if (err) {
consumerErrorCh.publish(err)
}
consumerFinishCh.publish(undefined)
})
)
} else {
consumerFinishCh.publish(undefined)
}
eachMessage = wrapFunction(
eachMessage,
consumerStartCh,
consumerFinishCh,
consumerErrorCh,
eachMessageExtractor
)

eachBatch = wrapFunction(
eachBatch,
batchConsumerStartCh,
batchConsumerFinishCh,
batchConsumerErrorCh,
eachBatchExtractor
)

return result
} catch (e) {
consumerErrorCh.publish(e)
consumerFinishCh.publish(undefined)
throw e
}
})
}
: eachMessage,
eachBatch:
typeof eachBatch === 'function'
? function (...eachBatchArgs) {
const innerAsyncResource = new AsyncResource('bound-anonymous-fn')
return innerAsyncResource.runInAsyncScope(() => {
const { batch } = eachBatchArgs[0]
const { topic, partition, messages } = batch
batchConsumerStartCh.publish({ topic, partition, messages, groupId })
try {
const result = eachBatch.apply(this, eachBatchArgs)
if (result && typeof result.then === 'function') {
result.then(
innerAsyncResource.bind(() => batchConsumerFinishCh.publish()),
innerAsyncResource.bind((err) => {
if (err) {
batchConsumerErrorCh.publish(err)
}
batchConsumerFinishCh.publish()
})
)
} else {
batchConsumerFinishCh.publish()
}

return result
} catch (error) {
batchConsumerErrorCh.publish(error)
batchConsumerFinishCh.publish()
throw error
}
})
}
: eachBatch,
return run({
eachMessage,
eachBatch,
...runArgs
})
}

return consumer
})
return Kafka
})

const wrapFunction = (fn, startCh, finishCh, errorCh, extractArgs) => {
return typeof fn === 'function'
? function (...args) {
const innerAsyncResource = new AsyncResource('bound-anonymous-fn')
return innerAsyncResource.runInAsyncScope(() => {
const extractedArgs = extractArgs(args)
startCh.publish(extractedArgs)
try {
const result = fn.apply(this, args)
if (result && typeof result.then === 'function') {
result.then(
innerAsyncResource.bind(() => finishCh.publish(undefined)),
innerAsyncResource.bind(err => {
if (err) {
errorCh.publish(err)
}
finishCh.publish(undefined)
})
)
} else {
finishCh.publish(undefined)
}
return result
} catch (e) {
errorCh.publish(e)
finishCh.publish(undefined)
throw e
}
})
}
: fn
}
16 changes: 7 additions & 9 deletions packages/datadog-plugin-kafkajs/src/batch-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@ class KafkajsBatchConsumerPlugin extends ConsumerPlugin {
static get operation () { return 'consume-batch' }

start ({ topic, partition, messages, groupId }) {
if (this.config.dsmEnabled) {
for (const message of messages) {
if (message?.headers && DsmPathwayCodec.contextExists(message.headers)) {
const payloadSize = getMessageSize(message)
this.tracer.decodeDataStreamsContext(message.headers)
this.tracer
.setCheckpoint(['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka'], null, payloadSize)
}
}
if (!this.config.dsmEnabled) return
for (const message of messages) {
if (!message || !message.headers || !DsmPathwayCodec.contextExists(message.headers)) continue
const payloadSize = getMessageSize(message)
this.tracer.decodeDataStreamsContext(message.headers)
this.tracer
.setCheckpoint(['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka'], null, payloadSize)
}
}
}
Expand Down

0 comments on commit 87c414b

Please sign in to comment.