-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsumer.js
37 lines (32 loc) · 870 Bytes
/
consumer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
const Kafka = require("kafkajs").Kafka;
run();
async function run() {
try {
const kafka = new Kafka({
clientId: "mynodeapp",
brokers: ["192.168.255.193:9092"],
});
const consumer = kafka.consumer({
groupId: "id",
});
console.log("Connecting Consumer.....");
await consumer.connect();
console.log("Connected Consumer!!!");
// subscribe to a topic
await consumer.subscribe({
topic: "Users",
fromBeginning: true, // read from beginning if you are a new consumer
});
// run the consumer
await consumer.run({
eachMessage: async (result) => {
console.log(
`Received Message: ${result.message.value} on partition ${result.partition}`
);
},
});
} catch (error) {
console.log(`Some error occured in consumer file: ${error}`);
} finally {
}
}