-
-
Notifications
You must be signed in to change notification settings - Fork 587
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Need replacement for kafka transporter #1200
Comments
I wrote a custom transporter based on /*
* moleculer
* Copyright (c) 2019 MoleculerJS (https://github.com/moleculerjs/moleculer)
* MIT Licensed
*/
"use strict";
const { defaultsDeep } = require("lodash");
const Transporter = require("moleculer").Transporters.Base;
const C = require("./constants");
/**
* Lightweight transporter for Kafka
*
* For test:
* 1. clone https://github.com/wurstmeister/kafka-docker.git repo
* 2. follow instructions on https://github.com/wurstmeister/kafka-docker#pre-requisites
* 3. start containers with Docker Compose
*
* docker-compose -f docker-compose-single-broker.yml up -d
*
* @class KafkaTransporter
* @property {ServiceBroker} broker
* @property {GenericObject} opts
* @property {LoggerInstance} logger
* @property {boolean} connected
* @extends {Transporter}
*/
class KafkaTransporter extends Transporter {
/**
* Creates an instance of KafkaTransporter.
*
* @param {any} opts
*
* @memberof KafkaTransporter
*/
constructor(opts) {
if (typeof opts === "string") {
opts = { brokers: opts.replace("kafka://", "") };
} else if (opts == null) {
opts = {};
}
opts = defaultsDeep(opts, {
// KafkaClient options. More info: https://kafka.js.org/docs/configuration
client: {
brokers: (Array.isArray(opts.brokers) ? opts.brokers : [opts.brokers])
},
// KafkaProducer options. More info: https://kafka.js.org/docs/producing#options
producer: {},
// ConsumerGroup options. More info: https://kafka.js.org/docs/consuming#a-name-options-a-options
consumer: {},
// Advanced options for `send`. More info: https://kafka.js.org/docs/producing#producing-messages
publish: {
partition: 0,
attributes: 0
}
});
super(opts);
this.client = null;
this.producer = null;
this.consumer = null;
this.admin = null;
}
/**
* Connect to the server
*
* @memberof KafkaTransporter
*/
connect() {
return new this.broker.Promise((resolve, reject) => {
let Kafka;
try {
Kafka = require("kafkajs").Kafka;
} catch (err) {
/* istanbul ignore next */
this.broker.fatal(
"The 'kafka-node' package is missing. Please install it with 'npm install kafkajs' command.",
err,
true
);
}
this.client = new Kafka(this.opts.client);
// Create Producer
this.producer = this.client.producer(this.opts.producer);
this.admin = this.client.admin();
this.admin.connect().then(() => {
this.producer.connect().then(() => {
/* Moved to ConsumerGroup
// Create Consumer
this.consumer = new Kafka.Consumer(this.client, this.opts.consumerPayloads || [], this.opts.consumer);
this.consumer.on("error", e => {
this.logger.error("Kafka Consumer error", e.message);
this.logger.debug(e);
if (!this.connected)
reject(e);
});
this.consumer.on("message", message => {
const topic = message.topic;
const cmd = topic.split(".")[1];
console.log(cmd);
this.incomingMessage(cmd, message.value);
});*/
this.logger.info("Kafka client is connected.");
this.onConnected().then(resolve);
}).catch((e) => {
this.logger.error("Kafka Producer error", e.message);
this.logger.debug(e);
this.broker.broadcastLocal("$transporter.error", {
error: e,
module: "transporter",
type: C.FAILED_PUBLISHER_ERROR
});
if (!this.connected) reject(e);
});
}).catch((e) => {
this.logger.error("Kafka Producer error", e.message);
this.logger.debug(e);
this.broker.broadcastLocal("$transporter.error", {
error: e,
module: "transporter",
type: C.FAILED_PUBLISHER_ERROR
});
if (!this.connected) reject(e);
});
});
}
/**
* Disconnect from the server
*
* @memberof KafkaTransporter
*/
disconnect() {
return new this.broker.Promise((resolve, reject) => {
if (this.consumer) {
this.consumer.disconnect(() => {
this.consumer = null;
});
}
if (this.producer) {
this.producer.disconnect(() => {
// this.client = null;
this.producer = null;
});
}
});
}
/**
* Subscribe to all topics
*
* @param {Array<Object>} topics
*
* @memberof BaseTransporter
*/
makeSubscriptions(topics) {
topics = topics.map(({ cmd, nodeID }) => ({topic: this.getTopicName(cmd, nodeID)}));
return new this.broker.Promise((resolve, reject) => {
this.admin.createTopics({topics: topics}).then(() => {
const consumerOptions = Object.assign(
{
id: "default-kafka-consumer",
kafkaHost: this.opts.host,
groupId: this.broker.instanceID, //this.nodeID,
fromOffset: "latest",
encoding: "buffer"
},
this.opts.consumer
);
this.consumer = this.client.consumer(consumerOptions);
this.consumer.connect().then(() => {
this.consumer.subscribe({topics: topics.map((topic)=> topic.topic)});
// Ref: https://kafka.js.org/docs/consuming#a-name-each-message-a-eachmessage
this.consumer.run({
eachMessage: async ({ topic, message }) => {
const cmd = topic.split(".")[1];
await this.receive(cmd, message.value);
console.log({
topic,
key: (message.key ? message.key.toString() : ""),
value: message.value.toString(),
headers: message.headers,
});
},
});
resolve();
}).catch((e)=> {
/* istanbul ignore next */
this.logger.error("Kafka Consumer error", e.message);
this.logger.debug(e);
this.broker.broadcastLocal("$transporter.error", {
error: e,
module: "transporter",
type: C.FAILED_CONSUMER_ERROR
});
if (!this.connected) reject(e);
});
}).catch(err => {
/* istanbul ignore next */
if (err) {
this.logger.error("Unable to create topics!", topics, err);
this.broker.broadcastLocal("$transporter.error", {
error: err,
module: "transporter",
type: C.FAILED_TOPIC_CREATION
});
return reject(err);
}
});
});
}
/**
* Send data buffer.
*
* @param {String} topic
* @param {Buffer} data
* @param {Object} meta
*
* @returns {Promise}
*/
send(topic, data, { packet }) {
if (!this.client) return this.broker.Promise.resolve();
return new this.broker.Promise((resolve, reject) => {
this.producer.send({
topic: this.getTopicName(packet.type, packet.target),
messages: [{value: data, partition: this.opts.publish.partition}], // Ref: https://kafka.js.org/docs/producing#message-structure
...this.opts.publish.attributes
}).then(() => {
resolve();
}).catch(err => {
if (err) {
this.logger.error("Publish error", err);
this.broker.broadcastLocal("$transporter.error", {
error: err,
module: "transporter",
type: C.FAILED_PUBLISHER_ERROR
});
reject(err);
}
});
});
}
}
module.exports = KafkaTransporter; |
Cool, thanks for sharing. I will check it and if it looks good, I will use it in the next (0.15) branch. |
FYI: I am using a custom KafkaTransport implemented with KafkaJS. It is working, but the CPU usage is really high. There are some issues regarding this. The bad news is that KafkaJS is looking for maintainers. tulios/kafkajs#1603 The good news, as mentioned in tulios/kafkajs#1603 (comment), is that Confluent is now officially supporting a JavaScript library. |
Awesome. However, the Confluent solution uses the If kafkajs won't have maintainer, I will remove the Kafka transporter from the core and move to a separated repo and anybody can choose whether to use the kafkajs transporter or make an own based on another library. |
kafka-node is not active, I think it's better to move on a new lib for Kafka transporter.
See:
kafka-node
The text was updated successfully, but these errors were encountered: