-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
129 lines (102 loc) · 4.52 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
""" Script to capture data from SwitchBot devices and save in csv file.
* refer to https://github.com/OpenWonderLabs/SwitchBotAPI-BLE
Quick start:
* Create venv and install library
$ python3 -m venv venv
$ source venv/bin/activate (Linux)
$ venv/Scripts/activate (Windows)
$ pip install bleak
* Specify the absolute path where data is saved in `LOG_DIRECTORY` variable
The folder structure should be:
LOG_DIRECTORY
0123456789AB # folder is automatically created. the name is each MAC address
20240801.csv # files are separated by day
20240802.csv
...
123456789ABC
20240801.csv
20240802.csv
...
* Run
$ python main.py
$ sudo venv/bin/python main.py (if permission is denied)
"""
import os
import datetime
import asyncio
from bleak import BleakScanner
# [USER SETTING] absolute path of data directory
LOG_DIRECTORY = "C:/Users/user/Documents/switchBotLog"
# dictionary to store the previous value of each device
# key: device MAC, value: tuple
prev_val = {}
# dictionary to store the latest data arraival time of each device
# key: device MAC, value: datetime.datetime
prev_time = {}
def decode_plug_power(mf_data: bytes) -> float:
seqNum = mf_data[6]
state = mf_data[7]
power = ((mf_data[10] & 0x7f) << 8) + mf_data[11]
power /= 10.0 # [W]
return power
def decode_meter_temp_and_hum(mf_data: bytes) -> tuple[float, float]:
temp_float = mf_data[8] & 0x0f
temp_int = mf_data[9] & 0x7f
temp_sign = 1 if (mf_data[9] & 0x80) == 0x80 else -1
temp = temp_sign * temp_int + 0.1 * temp_float
hum = mf_data[10] & 0x7f
return (temp, hum)
def is_value_different(devaddr, value) -> bool:
if prev_val.get(devaddr) == None:
return True
if prev_val.get(devaddr) != value:
return True
return False
def is_day_different(devaddr, now) -> bool:
if prev_time.get(devaddr) == None:
return True
if prev_time.get(devaddr).day != now.day:
return True
return False
def update_log_file(address: str, time: datetime.datetime, value):
# write log only when different value has arrived or the first time after the day has changed
if is_value_different(address, value) or is_day_different(address, time):
dir_path = os.path.join(LOG_DIRECTORY, address.replace(":",""))
file_name = time.strftime("%Y%m%d") + ".csv" # file name is date. eg)20240801.csv
os.makedirs(dir_path, exist_ok=True) # create directory if not exist
with open(os.path.join(dir_path, file_name), "a", encoding="utf-8") as f: # append to csv file
row = time.strftime("%H:%M:%S.%f")[:-3] # milli seconds
row += ","
row += ",".join([str(i) for i in value]) # tuple to str. eg) [12, 14] -> "12,14"
f.write(row + "\n")
print(address + "," + row) # console log
# save previous value and time
prev_val[address] = value
prev_time[address] = time
async def main():
stop_event = asyncio.Event()
def callback(device, advertising_data):
# data received time
now = datetime.datetime.now()
# manufacturer_data is dictionary and the keys are Bluetooth SIG assigned Company Identifiers
# the company identifier of SwitchBot is 2409 (Woan Technology (Shenzhen) Co., Ltd.)
if 2409 in advertising_data.manufacturer_data:
# type of the device is contained in service_data[0]. so service data should exist
if advertising_data.service_data:
service_data_bytes = advertising_data.service_data['0000fd3d-0000-1000-8000-00805f9b34fb']
manufacturer_data_bytes = advertising_data.manufacturer_data[2409]
# Meter device
if service_data_bytes[0] == 0x54 or service_data_bytes[0] == 0x77:
# decode temperature and humidity
(temp, hum) = decode_meter_temp_and_hum(manufacturer_data_bytes)
# save
update_log_file(device.address, now, (temp, hum))
# Plug device
if service_data_bytes[0] == 0x6a:
# decode power
power = decode_plug_power(manufacturer_data_bytes)
# save
update_log_file(device.address, now, (power, ))
async with BleakScanner(callback) as scanner:
await stop_event.wait()
asyncio.run(main())