forked from forgemedia/uGraphics
-
Notifications
You must be signed in to change notification settings - Fork 0
/
socketHandler.js
168 lines (140 loc) · 5.49 KB
/
socketHandler.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import { logger, io } from './server';
import FS from 'fs';
import _ from 'lodash';
import * as MechanismRegister from './mechanismRegister';
const lockingEnabled = false;
/**
* The central/master sync data store for all of the components
* @type {Object}
*/
let dataStore = {};
let sockets = {};
let locks = {};
let queues = {};
/**
* Emits a sync state for the named component
* @param {string} socketName The name of the component
* @param {Object} [delta] If present, will emit only this as a delta object
*/
export let emitSynf = (socketName, delta) => {
// Emit it over socket
io.emit(`${socketName}:synf`, delta || dataStore[socketName]);
// Log it
logger.silly(`Emitted ${socketName}:synf: ${delta? '(delta) ' + JSON.stringify(delta) : JSON.stringify(dataStore[socketName])}`);
};
/**
* Emits a trigger message for the named component
* @param {string} socketName The name of the component
* @param {Object} msg The object to emit as accompanying data
*/
let emitTrigger = (socketName, msg) => {
// Emit it over socket
io.emit(`${socketName}:trigger`, msg);
// Log it
logger.debug(`Emitted ${socketName}:trigger: ${JSON.stringify(msg)}`);
};
/**
* Emits a reply message for the named component
* @param {string} socketName The name of the component
* @param {Object} msg The object to emit as accompanying data
*/
let emitReply = (socketName, msg) => {
// Emit it over socket
io.emit(`${socketName}:reply`, msg);
// Log it
logger.debug(`Emitted ${socketName}:reply: ${JSON.stringify(msg)}`);
};
/** Sets up a sixty-second tick interval to keep the system in sync */
let setTick = () => {
// Every n seconds, emit a sync event for each socket
setInterval(() => {
logger.verbose('Tick');
for (let socketName of sockets) emitSynf(socketName);
}, 60000);
};
let unlockDataStore = dataStoreId => {
if (!locks[dataStoreId]) return;
logger.debug(`Unlocking ${dataStoreId}, ${queues[dataStoreId].length} messages to process`);
while (queues[dataStoreId].length > 0) _.assign(dataStore[dataStoreId], queues[dataStoreId].pop);
locks[dataStoreId] = false;
logger.debug(`Unlocked data store ${dataStoreId}`);
};
let debouncedEmitSynf = _.debounce(emitSynf, 50);
let mechanismCallbackFactory = dataStoreId => {
return (updateOnly) => {
logger.silly(`Mechanism callback on ${dataStoreId}${updateOnly? ', update only' : ''}`);
if (!updateOnly) unlockDataStore(dataStoreId);
debouncedEmitSynf(dataStoreId);
};
};
export default class {
constructor(isockets, initDataStore) {
sockets = isockets;
for (let socket of sockets) {
dataStore[socket] = initDataStore[socket] || {};
emitSynf(socket);
}
setTick();
}
getStore(id) {
return dataStore[id];
}
listStores(id) {
return _.keys(dataStore);
}
/**
* Called whenever a socket needs to be handled
* @param {socket} socket The socket that needs to be handled
*/
handleSocket(socket) {
// Get the IP address that's connecting and log it
let address = socket.request.connection.remoteAddress;
logger.verbose(`Connection from ${address}`);
socket.on('telegram', msg => {
logger.debug(`TELEGRAM: ${msg}`);
});
// For each socket defined in config.json, set a method to handle it
for (let socketName of sockets) {
queues[socketName] = [];
// When a sync message is received...
socket.on(`${socketName}:sync`, msg => {
// Log it
logger.silly(`Sync on ${socketName}:sync: ${JSON.stringify(msg)}`);
// Assign the received message into the respective data store
if (!locks[socketName]) _.assign(dataStore[socketName], msg);
else {
logger.debug(`Queuing sync message for ${socketName}`);
queues[socketName].push(msg);
}
// Emit it again on the same socket so other clients can pick it up
emitSynf(socketName, msg);
});
// When a get message is received...
socket.on(`${socketName}:get`, () => {
// Log it
logger.debug(`Get on ${socketName}:get`);
// Emit the relevant state from the data store again
emitSynf(socketName);
});
// When a trigger message is received...
socket.on(`${socketName}:trigger`, msg => {
// Log it
logger.debug(`Trig on ${socketName}:trigger: ${msg.id}, ${JSON.stringify(msg.data)}`);
// Pass it on
emitTrigger(socketName, msg);
});
socket.on(`${socketName}:reply`, msg => {
logger.debug(`Rply on ${socketName}:reply: ${msg.id}, ${JSON.stringify(msg.data)}`);
emitReply(socketName, msg);
});
socket.on(`${socketName}:mechanism`, msg => {
if (lockingEnabled) {
logger.debug(`Mechanism: locking data store ${socketName}`);
locks[socketName] = true;
}
logger.debug(`Mech on ${socketName}:mechanism: ${msg.id}, ${JSON.stringify(msg.data)}`);
if (!MechanismRegister.Handle(msg, dataStore[socketName], mechanismCallbackFactory(socketName))) logger.error(`- Mechanism ${msg.id} not found`);
});
}
};
}