-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathDAI.py
123 lines (107 loc) · 4.16 KB
/
DAI.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import re, time, json, threading, requests, traceback
from datetime import datetime
import paho.mqtt.client as mqtt
import DAN, SA
def df_func_name(df_name):
return re.sub(r'-', r'_', df_name)
MQTT_broker = getattr(SA,'MQTT_broker', None)
MQTT_port = getattr(SA,'MQTT_port', 1883)
MQTT_User = getattr(SA,'MQTT_User', None)
MQTT_PW = getattr(SA,'MQTT_PW', None)
MQTT_encryption = getattr(SA,'MQTT_encryption', None)
device_model = getattr(SA,'device_model', None)
device_name = getattr(SA,'device_name', None)
ServerURL = getattr(SA,'ServerURL', None)
device_id = getattr(SA,'device_id', None)
if device_id==None: device_id = DAN.get_mac_addr()
IDF_list = getattr(SA,'IDF_list', [])
ODF_list = getattr(SA,'ODF_list', [])
exec_interval = getattr(SA,'exec_interval', 1)
IDF_funcs = {}
for idf in IDF_list:
IDF_funcs[idf] = getattr(SA, df_func_name(idf), None)
ODF_funcs = {}
for odf in ODF_list:
ODF_funcs[odf] = getattr(SA, df_func_name(odf), None)
def on_connect(client, userdata, flags, rc):
if not rc:
print('MQTT broker: {}'.format(MQTT_broker))
if ODF_list == []:
print('ODF_list is not exist.')
return
topic_list=[]
for odf in ODF_list:
topic = '{}//{}'.format(device_id, odf)
topic_list.append((topic,0))
if topic_list != []:
r = client.subscribe(topic_list)
if r[0]: print('Failed to subscribe topics. Error code:{}'.format(r))
else: print('Connect to MQTT borker failed. Error code:{}'.format(rc))
def on_disconnect(client, userdata, rc):
print('MQTT Disconnected. Re-connect...')
client.reconnect()
def on_message(client, userdata, msg):
samples = json.loads(msg.payload)
ODF_name = msg.topic.split('//')[1]
if ODF_funcs.get(ODF_name):
ODF_data = samples['samples'][0][1]
ODF_funcs[ODF_name](ODF_data)
else:
print('ODF function "{}" is not existed.'.format(ODF_name))
def mqtt_pub(client, deviceId, IDF, data):
topic = '{}//{}'.format(deviceId, IDF)
sample = [str(datetime.today()), data]
payload = json.dumps({'samples':[sample]})
status = client.publish(topic, payload)
if status[0]: print('topic:{}, status:{}'.format(topic, status))
def on_register(result):
func = getattr(SA, 'on_register', None)
if func: func(result)
def MQTT_config(client):
client.username_pw_set(MQTT_User, MQTT_PW)
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
if MQTT_encryption: client.tls_set()
client.connect(MQTT_broker, MQTT_port, keepalive=60)
DAN.profile['dm_name'] = device_model
DAN.profile['df_list'] = IDF_list + ODF_list
if device_name: DAN.profile['d_name']= device_name
if MQTT_broker: DAN.profile['mqtt_enable'] = True
result = DAN.device_registration_with_retry(ServerURL, device_id)
on_register(result)
if MQTT_broker:
mqttc = mqtt.Client()
MQTT_config(mqttc)
mqttc.loop_start()
while True:
try:
for idf in IDF_list:
if not IDF_funcs.get(idf):
print('IDF function "{}" is not existed.'.format(idf))
continue
IDF_data = IDF_funcs.get(idf)()
if not IDF_data: continue
if type(IDF_data) is not tuple: IDF_data=[IDF_data]
if MQTT_broker: mqtt_pub(mqttc, device_id, idf, IDF_data)
else: DAN.push(idf, IDF_data)
time.sleep(0.001)
if not MQTT_broker:
for odf in ODF_list:
if not ODF_funcs.get(odf):
print('ODF function "{}" is not existed.'.format(odf))
continue
ODF_data = DAN.pull(odf)
if not ODF_data: continue
ODF_funcs.get(odf)(ODF_data)
time.sleep(0.001)
except Exception as e:
if str(e).find('mac_addr not found:') != -1:
print('Reg_addr is not found. Try to re-register...')
DAN.device_registration_with_retry(ServerURL, device_id)
else:
exception = traceback.format_exc()
print(exception)
if MQTT_broker: mqttc.reconnect()
time.sleep(1)
time.sleep(exec_interval)