This repository has been archived by the owner on Jun 3, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 6
/
MQTTClient.py
80 lines (66 loc) · 2.95 KB
/
MQTTClient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import logging
import multiprocessing
import time
import paho.mqtt.client as mqtt
import paho.mqtt.publish as publish
class MQTTClient(multiprocessing.Process):
def __init__(self, messageQ, commandQ, config):
self.logger = logging.getLogger('Max!-MQTT.MQTTClient')
self.logger.info("Starting...")
multiprocessing.Process.__init__(self)
self.messageQ = messageQ
self.commandQ = commandQ
self.mqttDataPrefix = config['mqtt_prefix']
self.mqtt_host = config['mqtt_host']
self.mqtt_port = config['mqtt_port']
self._mqttConn = mqtt.Client(client_id='Max!-MQTT'
'', clean_session=True, userdata=None)
self._mqttConn.connect(self.mqtt_host, port=self.mqtt_port, keepalive=120)
self._mqttConn.on_disconnect = self._on_disconnect
self._mqttConn.on_publish = self._on_publish
self._mqttConn.on_message = self._on_message
self.message_timeout = config['mqtt_message_timeout']
def close(self):
self.logger.info("Closing connection")
self._mqttConn.disconnect()
def _on_disconnect(self, client, userdata, rc):
if rc != 0:
self.logger.error("Unexpected disconnection.")
self._mqttConn.reconnect()
def _on_publish(self, client, userdata, mid):
self.logger.debug("Message " + str(mid) + " published.")
def _on_message(self, client, userdata, message):
self.logger.debug("Message received: %s" % (message))
data = message.topic.replace(self.mqttDataPrefix + "/", "").split("/")
data_out = {
'method': 'command',
'topic': message.topic,
'deviceId': data[0],
'param': data[1],
'payload': message.payload.decode('ascii'),
'qos': 1,
'timestamp': time.time()
}
self.commandQ.put(data_out)
if message.retain != 0:
(rc, final_mid) = self._mqttConn.publish(message.topic, None, 1, True)
self.logger.info("Clearing topic " + message.topic)
def publish(self, task):
if task['timestamp'] <= time.time() + self.message_timeout:
topic = "%s/%s/%s" % (self.mqttDataPrefix, task['deviceId'], task['param'])
try:
if task['payload'] is not None:
publish.single(topic, hostname=self.mqtt_host, port=self.mqtt_port, payload=task['payload'])
self.logger.debug('Sending:%s' % (task))
except Exception as e:
self.logger.error('Publish problem: %s' % (e))
self.messageQ.put(task)
def run(self):
self._mqttConn.subscribe(self.mqttDataPrefix + "/+/+/set")
while True:
while not self.messageQ.empty():
task = self.messageQ.get()
if task['method'] == 'publish':
self.publish(task)
time.sleep(0.01)
self._mqttConn.loop()