-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwater_system.py
50 lines (40 loc) · 1.32 KB
/
water_system.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import RPi.GPIO as GPIO
import threading
import time
from data_generator import DataGenerator
from lib.switch import Switch
from lib.File import File
DEBUG = False
class Water_System:
# Pins
push_button = 7
led = 11
path = "/home/pi/Desktop/Smart-Water-System/data/sensor_reading.csv"
def __init__(self):
self.lock = threading.Lock()
self.data_generator = DataGenerator(self)
self.status = 0
self.last_water_end_time = 0
def pi_setup(self):
GPIO.setmode(GPIO.BOARD)
def pi_cleanup(self):
GPIO.cleanup()
def write_data(self,lock):
if self.last_water_end_time == 0:
last_watered_time = 0
else:
last_watered_time = time.time() - self.last_water_end_time
with lock:
File(Water_System.path).write_row(self.data_generator.collect_data(self.status, last_watered_time))
def run(self):
self.pi_setup()
switch_obj = Switch(self, Water_System.push_button)
switch_obj.run(Water_System.led)
thread1 = threading.Thread(target=self.data_generator.monitor_data, name="thread1",
args=(self.lock,), daemon=True)
thread1.start()
try:
while True:
pass
except KeyboardInterrupt:
return