This repository has been archived by the owner on Jun 12, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathclient.js
106 lines (87 loc) · 3.21 KB
/
client.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
var cliparse = require("cliparse");
var readline = require('readline');
var kafka = require('node-rdkafka');
var stdin = readline.createInterface({
input: process.stdin,
output: process.stdout,
terminal: false
});
function produce(params) {
var producer = new kafka.Producer({
'metadata.broker.list': params.options.broker,
'security.protocol': 'SASL_SSL',
'ssl.ca.location': '/etc/ssl/certs',
'api.version.request': 'true',
'sasl.mechanisms': 'PLAIN',
'sasl.username': params.options.username,
'sasl.password': params.options.password,
'dr_cb': true // Specifies that we want a delivery-report event to be generated
});
producer.on('event.log', function(log) {
console.log('log', log);
});
producer.on('error', function(err) {
console.error('Error on producer', err);
});
producer.on('delivery-report', function(report) {
console.log('Producer delivery-report', report);
});
producer.connect();
producer.on('ready', function() {
console.log('Ready to produce messages. Write something to stdin...')
try {
var topic = producer.Topic(params.options.topic, {
// Make the Kafka broker acknowledges each message (optional)
'request.required.acks': 1
});
stdin.on('line', function(line) {
// if partition is set to -1, the default partitioner is used
var partition = -1;
var value = new Buffer(line);
producer.produce(topic, partition, value);
});
} catch (err) {
console.error('Fail to produce message', err);
}
});
}
function consume(params) {
if (!params.options["consumer-group"]) {
params.options["consumer-group"] = params.options.username + ".node";
}
var consumer = new kafka.KafkaConsumer({
'metadata.broker.list': params.options.broker,
'security.protocol': 'SASL_SSL',
'ssl.ca.location': '/etc/ssl/certs',
'api.version.request': true,
'debug': 'protocol,security,broker',
'sasl.mechanisms': 'PLAIN',
'sasl.username': params.options.username,
'sasl.password': params.options.password,
'group.id': params.options["consumer-group"]
});
consumer.connect();
consumer.on('ready', function() {
console.log('Ready to consume messages...');
consumer.consume(params.options.topic, function(err, message) {
console.log(message.value.toString());
});
});
}
var options = [
cliparse.option("broker", { description: "Kafka broker address"}),
cliparse.option("topic", { description: "Topic, prefixed by your namespace (eg. --topic=myns.topic)"}),
cliparse.option("username", { description: "SASL username, prefixed by your namespace (eg. --username=myns.user)"}),
cliparse.option("password", { description: "SASL password"}),
cliparse.option("consumer-group", { description: "Consumer group, prefixed by your SASL username (eg. --consumer-group=myns.user.g1)"}),
];
cliparse.parse(
cliparse.cli({
name: "kafka-client",
description: "Node.js Kafka client to produce/consume using SASL/SSL",
commands: [
cliparse.command("produce", { description: "Produce messages", options: options }, produce),
cliparse.command("consume", { description: "Consume messages", options: options }, consume)
]
})
);