-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathcli.py
92 lines (64 loc) · 2.26 KB
/
cli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import asyncio
import logging
import sys
from typing import List
import aioconsole
from bleak.backends.device import BLEDevice
from melnor_bluetooth.device import Device
from melnor_bluetooth.scanner import scanner
from melnor_bluetooth.utils.formatter import CustomFormatter
logging.basicConfig(level=logging.DEBUG)
logging.getLogger().handlers[0].setFormatter(CustomFormatter())
logging.getLogger("bleak").setLevel(logging.WARNING)
devices: List[Device] = []
_LOGGER = logging.getLogger(__name__)
def detection_callback(ble_device: BLEDevice):
address = ble_device.address
has_device = [d for d in devices if d.mac == address]
if len(has_device) == 0:
device = Device(ble_device)
devices.append(device)
_LOGGER.info("Found device %s", device.mac)
async def main():
await scanner(detection_callback, scan_timeout_seconds=10)
if len(devices) == 0:
_LOGGER.warning("No devices found")
return
device = devices[0]
await device.connect()
while True:
await device.fetch_state()
# Take user input
line = await aioconsole.ainput(
"Format command as [zone, state, minutes] i.e. '1, on, 10'"
+ "\nOr 'd' to disconnect:\n"
)
if line != "d":
args = line.split(",")
zone = args[0]
minutes = None
state = None
if len(args) > 1:
state = args[1]
if len(args) > 2:
minutes = int(args[2])
valve = device[f"zone{zone}"]
if valve is None:
_LOGGER.warning("Zone %s not found", zone)
continue
_LOGGER.info("Setting zone %s to %s for %s minutes", zone, state, minutes)
if minutes is not None:
await valve.set_manual_watering_minutes(int(1))
if state is None:
await valve.set_is_watering(not valve.is_watering)
else:
await valve.set_is_watering(state == "on")
await device.push_state()
elif line == "d":
await device.disconnect()
sys.exit(0)
else:
_LOGGER.error("Invalid command")
await device.push_state()
continue
asyncio.run(main())