-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathindex.js
64 lines (51 loc) · 1.72 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// Configuration setup for connection
require('dotenv').config()
var mqtt = require("mqtt")
var Processor = require('./processor')
var host = "tcp://mr2r9za6fwi0wf.messaging.solace.cloud:1883"
var config = {
// username: "covid-public-client",
username: process.env.BROKER_USR,
password: process.env.BROKER_PWD
// password: "covid19"
}
var topic_sub = "com/covidtracking/states/current/update/#"
var topic_pub = "com/covidtracking/states/current/percent/update/"
var topic_demo = "demo/test"
// Connect to the broker
var client = mqtt.connect(host, config)
// Setup callback functions
// on connect
client.on("connect", () => {
console.log("Connected! 💥")
client.subscribe(topic_sub)
})
// on message
client.on("message", (topic, message) => {
var res = JSON.parse(message.toString())
// console.log("Received Message 👀 on topic: ", topic, "\nMessage: ", res)
console.log("Recieved")
// Modify the stream
// var newObj = {
// state: res.state,
// totalTestResults: res.totalTestResults,
// death: res.death,
// negative: res.negative,
// positive: res.positive,
// percentPositive: res.positive/res.totalTestResults,
// percentNegative: res.negative/res.totalTestResults,
// }
var newObj = Processor.process(res)
console.log(" Publishing to topic! 🚀")
client.publish(topic_pub.concat(res.state), Buffer.from(JSON.stringify(newObj)))
})
client.on('disconnect', () => {
console.log("DISCONNECTED! 🤔")
// Logic handling disconnection
})
client.on('error', (err) => {
console.log("Error! 😱\n", err)
// Logic handling errors
})
// Consume and process the stream of data
// Publish back to modifed stream