-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.py
219 lines (193 loc) · 7.75 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# Main
def runSafe(cmd, p1 = None):
try:
if (p1 != None):
return cmd(p1)
return cmd()
except KeyboardInterrupt:
raise
except Exception as e:
SerialLog.log("Exception", str(e))
sys.print_exception(e)
gc.collect()
try:
# Turn on the LED to show we are alive
from board_system.cpu_hardware import CpuHardware
CpuHardware.StatusLedOn()
# set the CPU to max speed
CpuHardware.SetCpuMaxSpeed()
# Connect to wifi and web and check for updates
from modules.wifi.wifi import WifiHandler
wifi = WifiHandler()
wifi.preStart()
# some storage
ledOn = True
telemetry = dict()
allModules = [ wifi ]
# Get the list of modules to load
from serial_log import SerialLog
SerialLog.log("Loading modules..")
import ujson
f = open("profile.json",'r')
settings_string=f.read()
f.close()
settings_dict = ujson.loads(settings_string)
# load up all other modules
import sys
import gc
SerialLog.log("Ram Total:", gc.mem_alloc() + gc.mem_free())
SerialLog.log("Ram Free:", gc.mem_free())
SerialLog.log("Ram Allocated:", gc.mem_alloc(), "(", int(gc.mem_alloc()*100/(gc.mem_alloc() + gc.mem_free())), "%)")
# start Web processing
from modules.web.web_processor import WebProcessor
web = WebProcessor()
allModules.append(web)
for modname in settings_dict['activeModules']:
ramfree = gc.mem_free()
if modname == 'basic':
pass
elif modname == 'mqtt':
from modules.mqtt.mqtt_control import MqttControl
allModules.append(MqttControl())
elif modname == 'homeassistant':
from modules.homeassistant.homeassistant_control import HomeAssistantControl
allModules.append(HomeAssistantControl())
elif modname == 'thingsboard':
from modules.thingsboard.thingsboard_control import ThingsboardControl
allModules.append(ThingsboardControl())
elif modname == 'web':
pass # preloaded
elif modname == 'wifi':
pass # preloaded
elif modname == 'ota':
pass # special and different
elif modname == 'builtin_button':
from modules.builtin_button.builtin_button_control import BuiltinButtonControl
allModules.append(BuiltinButtonControl())
elif modname == 'dht11':
from modules.dht11.dht11 import Dht11Monitor
allModules.append(Dht11Monitor())
elif modname == 'dht22':
from modules.dht22.dht22 import Dht22Monitor
allModules.append(Dht22Monitor())
elif modname == 'ds18b20temp':
from modules.ds18b20temp.ds18b20_temp import DS18B20Temp
allModules.append(DS18B20Temp())
elif modname == 'temphistory':
from modules.temphistory.temphistory import TempHistory
allModules.append(TempHistory())
elif modname == 'ntp':
from modules.ntp.ntp import NtpSync
allModules.append(NtpSync())
elif modname == 'lilygo_battery':
from modules.lilygo_battery.lilygo_battery import LilyGoBattery
allModules.append(LilyGoBattery())
elif modname == 'four_relay':
from modules.four_relay.four_relay import FourRelay
allModules.append(FourRelay())
elif modname == 'four_button':
from modules.four_button.four_button_control import FourButton
allModules.append(FourButton())
elif modname == "dion_hallway_lights":
from modules.dion_hallway_lights.dion_hallway_lights import DionHallwayLightsControl
allModules.append(DionHallwayLightsControl())
elif modname == "pir":
from modules.pir.pirDetect import PIRDetect
allModules.append(PIRDetect())
elif modname == "ac_remote":
from modules.ac_remote.ac_remote import AcRemote
allModules.append(AcRemote())
elif modname == "binary_clock":
from modules.binary_clock.binary_clock import BinaryClock
allModules.append(BinaryClock())
elif modname == "mosfet":
from modules.mosfet.mosfet_control import MosfetControl
allModules.append(MosfetControl())
elif modname == "gps":
from modules.gps.gps_control import GPSControl
allModules.append(GPSControl())
elif modname == "ledstrip":
from modules.ledstrip.ledstrip_control import LedStripControl
allModules.append(LedStripControl())
elif modname == "us_range":
from modules.us_range.us_range_sensor import USRangeSensor
allModules.append(USRangeSensor())
elif modname == "garage_door_closer":
from modules.garage_door_closer.garage_door_control import GarageDoorControl
allModules.append(GarageDoorControl())
elif modname == "touchscreen":
from modules.touchscreen.touchscreen import TouchScreen
allModules.append(TouchScreen())
elif modname == "wdt":
from modules.wdt.wdt import WDTControl
allModules.append(WDTControl())
elif modname == "relay":
from modules.relay.relay import Relay
allModules.append(Relay())
else:
SerialLog.log("Error: Unsupported Module! ", modname);
SerialLog.log("Completed loading ", modname, " Ram Used:", ramfree - gc.mem_free())
SerialLog.log("Ram Allocated:", gc.mem_alloc(), "(", int(gc.mem_alloc()*100/(gc.mem_alloc() + gc.mem_free())), "%)")
gc.collect()
SerialLog.log("All code compiled. Ram Free:", gc.mem_free())
# start all the modules up
routes = {}
panels = {}
for mod in allModules:
SerialLog.log("Starting: ", type(mod))
runSafe(mod.start)
routes.update(runSafe(mod.getRoutes))
panels.update(runSafe(mod.getIndexFileName))
SerialLog.log("Setting up Web Routes:")
web.setRoutes(routes)
web.setTelemetry(telemetry)
web.setPanels(panels)
# Switch off the led before the loop, it will flash if it needs to
CpuHardware.StatusLedOff()
gc.collect()
SerialLog.log("All modules loaded. Ram Free:", gc.mem_free())
SerialLog.log("Main loop starting...")
while True:
# tick all modules
for mod in allModules:
runSafe(mod.tick)
# get all telemetry
for mod in allModules:
# merge all telemetry into the telemetry object
newTel = runSafe(mod.getTelemetry)
runSafe(telemetry.update, newTel)
# process all telemetry
for mod in allModules:
runSafe(mod.processTelemetry, telemetry)
# get all commands
commands = []
for mod in allModules:
# add all commands to the commands list
commands.extend(runSafe(mod.getCommands))
# process all commands
if len(commands) > 0:
SerialLog.log("Commands: ", commands)
for mod in allModules:
runSafe(mod.processCommands, commands)
# blink blue
if (web.getLedEnabled()):
ledOn = not ledOn
if (ledOn):
CpuHardware.StatusLedOn()
else:
CpuHardware.StatusLedOff()
else:
if (ledOn):
CpuHardware.StatusLedOff()
except KeyboardInterrupt:
raise
except Exception as e:
from sys import print_exception
from time import sleep
from serial_log import SerialLog
print_exception(e)
SerialLog.log("Fatal exception, will reboot in 10s")
for y in range(0, 100): # lots of little sleeps, hopefully means repl can connect
sleep(0.1)
from machine import reset
reset()