-
Notifications
You must be signed in to change notification settings - Fork 5
/
index.js
92 lines (85 loc) · 3.27 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
/**
* Creates kafka consumer and producer observables given a kafka client adapter
* and a options object.
*
* @example <caption>consumer</caption>
* const opts = { brokers: 'kafka://127.0.0.1:9092', groupId: 'me' };
* const KafkaObservable = require('kafka-observable')(opts);
* const observable = KafkaObservable.fromTopic('my_topic');
*
* @example <caption>producer</caption>
* const opts = { brokers: 'kafka://127.0.0.1:9092' };
* const KafkaObservable = require('kafka-observable')(opts);
* const observable = KafkaObservable.toTopic('my_topic', 'my message');
*
* @module KafkaObservable
* @author ghermeto
**/
'use strict';
const Consumer = require('./lib/observables/consumer');
const Producer = require('./lib/observables/producer');
const jsonMessage = require('./lib/operators/json-message');
const kafkaMessage = require('./lib/operators/kafka-message');
/**
* @param {Object} options client options
* @param {boolean} options.autoCommit automatically commits read message
* @param {Object|String} options.partitioner name or class for partitioner implementation
* @param {String} options.strategy name of the assignment strategy
* @param {Object} clientFactory your custom kafka client adapter
* @returns {ObservableFactory}
* @see {@link https://github.com/oleksiyk/kafka/blob/master/README.md} for default client options.
*/
function KafkaObservable(options, clientFactory) {
/**
* Object capable of creating both the consumer and producer observables
* @inner
* @name ObservableFactory
* @typedef {Object} ObservableFactory
* @property {Function} fromTopic same as fromTopic from prototype
* @property {Function} toTopic same as toTopic from prototype
*/
return {
/**
* Creates a Consumer observable
* @param {string} topic topic to subscribe
* @param {Object} opts kafka client options
* @param {Object} client kafka adapter
* @returns {Observable}
* @see {@link observables/Consumer}
*/
fromTopic:
(topic, opts = options, client = clientFactory) =>
Consumer.create(topic, opts, client),
/**
* Creates a Producer observable
* @param {string} topic topic to subscribe
* @param {Array|Observable} messages message(s) to be sent
* @param {Object} opts kafka client options
* @param {Object} client kafka adapter
* @returns {Observable}
* @see {@link observables/Producer}
*/
toTopic:
(topic, messages, opts = options, client = clientFactory) =>
Producer.create(topic, messages, opts, client),
/**
* Operator: formats a message as a JSON object
* @function
* @param {Function} mapper mapping function
*/
JSONMessage: jsonMessage,
/**
* Operator: formats a message as a text message
* @function
* @param {Function} mapper mapping function
*/
TextMessage: kafkaMessage
}
}
// observables
KafkaObservable.fromTopic = KafkaObservable().fromTopic;
KafkaObservable.toTopic = KafkaObservable().toTopic;
// operators
KafkaObservable.JSONMessage = jsonMessage;
KafkaObservable.TextMessage = kafkaMessage;
module.exports = KafkaObservable;