You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
C++ Toolchain [e.g. Visual Studio, llvm, g++]: g++ 12.2.0-14
node-rdkafka version [e.g. 2.3.3]: 3.0.1
Steps to Reproduce
Instantiate Kafka producer client
Encode message in AVRO
Send encoded message
Flush Kafka producer client
When producer.flush() is called, the client crashes with the message:
Error: Need to specify a timeout and a callback
at Producer.flush (/app/node_modules/node-rdkafka/lib/producer.js:252:16)
at file:///app/helpers/getKafkaProducer.js:76:16
at new Promise (<anonymous>)
at flushedClose (file:///app/helpers/getKafkaProducer.js:74:10)
at handler (file:///app/index.js:52:11)
at process.processTicksAndRejections (node:internal/process/task_queues:95:5)
Here's a snippet of getKafkaProducer.js:
export async function flushedClose (producer) {
return new Promise((resolve, reject) => {
if (producer && producer.isConnected()) {
producer.flush(5000, (e) => {
if (e) {
return reject(e)
}
}).disconnect(5000, (e, metrics) => {
if (e) {
return reject(e)
}
if (metrics) {
console.log('Kafka disconnect metrics', metrics)
}
return resolve(metrics)
})
}
})
}
I think I may have found two issues. I should disconnect when the produce.flush() callback is called. The code would look like this:
export async function flushedClose (producer) {
return new Promise((resolve, reject) => {
if (producer && producer.isConnected()) {
producer.flush(5000, (e) => {
if (e) {
return reject(e)
}
producer.disconnect(5000, (e, metrics) => {
if (e) {
return reject(e)
}
if (metrics) {
console.debug('Kafka disconnect metrics', metrics)
}
return resolve(metrics)
})
})
}
})
}
The second problem was that the function was simplified, and the timeouts were using environment variables and the values were not parsed into numbers.
Environment Information
Steps to Reproduce
When
producer.flush()
is called, the client crashes with the message:Here's a snippet of
getKafkaProducer.js
:node-rdkafka Configuration Settings
Additional context
I followed the index.d.ts type definition for
flush
in theProducer
class:The text was updated successfully, but these errors were encountered: