forked from kevinmcaleer/weatherhat_to_mqtt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
wh.py
245 lines (182 loc) · 7.14 KB
/
wh.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
import math
import select
import threading
import time
import gpiod
import gpiodevice
import ioexpander as io
from bme280 import BME280
from gpiod.line import Bias, Edge
from ltr559 import LTR559
from smbus2 import SMBus
from .history import wind_degrees_to_cardinal
__version__ = '1.0.0'
# Wind Vane
PIN_WV = 8 # P0.3 ANE6
# Anemometer
PIN_ANE1 = 5 # P0.0
PIN_ANE2 = 6 # P0.1
ANE_RADIUS = 7 # Radius from center to the center of a cup, in CM
ANE_CIRCUMFERENCE = ANE_RADIUS * 2 * math.pi
ANE_FACTOR = 2.18 # Anemometer factor
# Rain gauge
PIN_R2 = 3 # P1.2
PIN_R3 = 7 # P1.1
PIN_R4 = 2 # P1.0
PIN_R5 = 1 # P1.5
RAIN_MM_PER_TICK = 0.2794
wind_direction_to_degrees = {
0.9: 0,
2.0: 45,
3.0: 90,
2.8: 135,
2.5: 180,
1.5: 225,
0.3: 270,
0.6: 315
}
class WeatherHAT:
def __init__(self):
self.updated_wind_rain = False
self._interrupt_pin = 4
self._lock = threading.Lock()
self._i2c_dev = SMBus(1)
self._bme280 = BME280(i2c_dev=self._i2c_dev)
self._ltr559 = LTR559(i2c_dev=self._i2c_dev)
self._ioe = io.IOE(i2c_addr=0x12)
self._chip = gpiodevice.find_chip_by_platform()
self._int = self._chip.request_lines(
consumer="weatherhat",
config={
self._interrupt_pin: gpiod.LineSettings(
edge_detection=Edge.FALLING, bias=Bias.PULL_UP
)
}
)
# Input voltage of IO Expander, this is 3.3 on Breakout Garden
self._ioe.set_adc_vref(3.3)
# Wind Vane
self._ioe.set_mode(PIN_WV, io.ADC)
# Anemometer
self._ioe.set_mode(PIN_ANE1, io.OUT)
self._ioe.output(PIN_ANE1, 0)
self._ioe.set_pin_interrupt(PIN_ANE2, True)
self._ioe.setup_switch_counter(PIN_ANE2)
# Rain Sensor
self._ioe.set_mode(PIN_R2, io.IN_PU)
self._ioe.set_mode(PIN_R3, io.OUT)
self._ioe.set_mode(PIN_R4, io.IN_PU)
self._ioe.setup_switch_counter(PIN_R4)
self._ioe.set_mode(PIN_R5, io.IN_PU)
self._ioe.output(PIN_R3, 0)
self._ioe.set_pin_interrupt(PIN_R4, True)
# Data API... kinda
self.temperature_offset = -7.5
self.device_temperature = 0.0
self.temperature = 0.0
self.pressure = 0.0
self.humidity = 0.0
self.relative_humidity = 0.0
self.dewpoint = 0.0
self.lux = 0.0
self.wind_speed = 0.0
self.wind_direction = 0.0
self.rain = 0.0
self.rain_total = 0.0
self.reset_counts()
self._poll_thread = threading.Thread(target=self._t_poll_ioexpander)
self._poll_thread.start()
self._ioe.enable_interrupt_out()
self._ioe.clear_interrupt()
def __del__(self):
self._polling = False
self._poll_thread.join()
def reset_counts(self):
self._lock.acquire(blocking=True)
self._ioe.clear_switch_counter(PIN_ANE2)
self._ioe.clear_switch_counter(PIN_R4)
self._lock.release()
self._wind_counts = 0
self._rain_counts = 0
self._last_wind_counts = 0
self._last_rain_counts = 0
self._t_start = time.time()
def compensate_humidity(self, humidity, temperature, corrected_temperature):
"""Compensate humidity.
Convert humidity to relative humidity.
"""
dewpoint = self.get_dewpoint(humidity, temperature)
corrected_humidity = 100 - (5 * (corrected_temperature - dewpoint)) - 20
return min(100, max(0, corrected_humidity))
def get_dewpoint(self, humidity, temperature):
"""Calculate Dewpoint."""
return temperature - ((100 - humidity) / 5)
def hpa_to_inches(self, hpa):
"""Convert hectopascals to inches of mercury."""
return hpa * 0.02953
def degrees_to_cardinal(self, degrees):
value, cardinal = min(wind_degrees_to_cardinal.items(), key=lambda item: abs(item[0] - degrees))
return cardinal
def _t_poll_ioexpander(self):
self._polling = True
poll = select.poll()
poll.register(self._int.fd, select.POLLIN)
while self._polling:
if not poll.poll(10):
continue
for event in self._int.read_edge_events():
if event.line_offset == self._interrupt_pin:
self.handle_ioe_interrupt()
time.sleep(1.0 / 100)
def update(self, interval=60.0):
# Time elapsed since last update
delta = float(time.time() - self._t_start)
self.updated_wind_rain = False
# Always update TPHL & Wind Direction
self._lock.acquire(blocking=True)
self.device_temperature = self._bme280.get_temperature()
self.temperature = self.device_temperature + self.temperature_offset
self.pressure = self._bme280.get_pressure()
self.humidity = self._bme280.get_humidity()
self.relative_humidity = self.compensate_humidity(self.humidity, self.device_temperature, self.temperature)
self.dewpoint = self.get_dewpoint(self.humidity, self.device_temperature)
self.lux = self._ltr559.get_lux()
self.wind_direction_raw = self._ioe.input(PIN_WV)
self._lock.release()
value, self.wind_direction = min(wind_direction_to_degrees.items(), key=lambda item: abs(item[0] - self.wind_direction_raw))
# Don't update rain/wind data until we've sampled for long enough
if delta < interval:
return
self.updated_wind_rain = True
rain_hz = self._rain_counts / delta
wind_hz = self._wind_counts / delta
self.rain_total = self._rain_counts * RAIN_MM_PER_TICK
self.reset_counts()
# wind speed of 2.4km/h causes the switch to close once per second
wind_hz /= 2.0 # Two pulses per rotation
wind_cms = wind_hz * ANE_CIRCUMFERENCE * ANE_FACTOR
self.wind_speed = wind_cms / 100.0
self.rain = rain_hz * RAIN_MM_PER_TICK
def handle_ioe_interrupt(self):
self._lock.acquire(blocking=True)
self._ioe.clear_interrupt()
wind_counts, _ = self._ioe.read_switch_counter(PIN_ANE2)
rain_counts, _ = self._ioe.read_switch_counter(PIN_R4)
# If the counter value is *less* than the previous value
# then we know the 7-bit switch counter overflowed
# We bump the count value by the lost counts between last_wind and 128
# since at 127 counts, one more count will overflow us back to 0
if wind_counts < self._last_wind_counts:
self._wind_counts += 128 - self._last_wind_counts
self._wind_counts += wind_counts
else:
self._wind_counts += wind_counts - self._last_wind_counts
self._last_wind_counts = wind_counts
if rain_counts < self._last_rain_counts:
self._rain_counts += 128 - self._last_rain_counts
self._rain_counts += rain_counts
else:
self._rain_counts += rain_counts - self._last_rain_counts
self._last_rain_counts = rain_counts
# print(wind_counts, rain_counts, self._wind_counts, self._rain_counts)
self._lock.release()