-
Notifications
You must be signed in to change notification settings - Fork 0
/
broker.js
87 lines (72 loc) · 2.31 KB
/
broker.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
const zeromq = require("zeromq");
const async = require("async");
const ServiceBalancer = require(__dirname + "/service_balancer");
const Protocol = require(__dirname + "/protocol");
function Broker(opt) {
if(opt === undefined) {
throw new Error("opt must be defined");
}
if(opt.bind_address === undefined) {
throw new Error("opt.bind_address must be defined");
}
if(opt.services === undefined) {
throw new Error("opt.services must be defined");
}
this.log = opt.log;
this.services = createServices(opt.services);
this.router = createRouter(opt.bind_address);
this.queue_task = async.priorityQueue((task, callback) => {
if(task.type === "input") {
const [identity, delimiter, data] = task.frames;
const req = Protocol.decodeReq(data);
this.log && this.log("broker-receive", req);
this.services[req.header.service_name].send(task.frames);
}
else if(task.type === "output") {
const [identity, delimiter, data] = task.frames;
const req = Protocol.decodeReq(data);
this.log && this.log("broker-reply", req);
this.router.send(task.frames);
}
process.nextTick(callback);
})
}
Broker.prototype.listen = function(callback) {
Object.keys(this.services).forEach((name) => {
const service = this.services[name];
service.onMessage((...frames) => {
this.queue_task.push({
type: "output",
frames
}, 1);
});
service.listen();
});
this.router.on("message", (...frames) => {
this.queue_task.push({
type: "input",
frames
}, 2);
});
callback && callback();
};
Broker.prototype.close = function() {
Object.keys(this.services).forEach((name) => {
this.services[name].close();
});
if(this.router) {
this.router.close();
this.router = undefined;
}
};
const createRouter = function(bind_address) {
return zeromq.socket("router").bind(bind_address);
};
const createServices = function(services) {
const results = {};
services.forEach((service) => {
results[service.name] = new ServiceBalancer(service.upstream);
});
return results;
};
module.exports = Broker;