-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathble.py
217 lines (160 loc) · 4.87 KB
/
ble.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
import os
import re
import time
import binascii
from bluepy import btle
from bluepy.btle import Scanner
from bluepy.btle import DefaultDelegate
gBLEScanner = None
gBLEDevices = []
gBLENotify = []
#
# bluetooth device restart
#
def setupBLE():
os.system("sudo systemctl restart bluetooth")
time.sleep(2)
#
# scan delegate class
#
class ScanDelegate(DefaultDelegate):
def __init__(self):
DefaultDelegate.__init__(self)
def handleDiscovery(self, dev, isNewDev, isNewData):
global gBLEDevices
global gBLEScanner
if isNewDev:
for (_, desc, value) in dev.getScanData():
m = re.match("\w\w\w\w\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w\w\w\w\w\w\w\w\w", value)
if m:
gBLEDevices.append([dev.addr, dev.addrType, dev.rssi, value])
#
# peripheral delegate class
#
class PeripheralDelegate(DefaultDelegate):
def __init__(self):
DefaultDelegate.__init__(self)
def handleNotification(self, cHandle, data):
print(data)
gBLENotify.append(data)
#
# ble device class
#
class Device(object):
def __init__(self, addr, addrType, rssi, uuid):
self.addr = addr
self.addrType = addrType
self.rssi = rssi
self.uuid = uuid
#
# ble Central class
#
class Central(object):
mPeripheral = None
def __init__(self):
global gBLEScanner
global gBLEDevices
gBLEScanner = Scanner().withDelegate(ScanDelegate())
gBLEDevices = []
def scan(self, uuid, timeout=0.15, autoloop=False):
global gBLEScanner
global gBLEDevices
targetUUID = uuid
# scan service
gBLEDevices = []
gBLEScanner.scan(timeout)
matchDevs = []
if not (gBLEDevices == []):
# check match uuid
for i in range(len(gBLEDevices)):
addr = gBLEDevices[i][0]
type = gBLEDevices[i][1]
rssi = gBLEDevices[i][2]
uuid = gBLEDevices[i][3]
m = re.match(uuid, targetUUID)
if m:
matchDevs.append(Device(addr, type, rssi, uuid))
# check hit device
if matchDevs == []:
return None
else:
return matchDevs
def connectTo(self, dev):
self.mPeripheral = btle.Peripheral(dev.addr, dev.addrType)
self.mPeripheral.withDelegate(PeripheralDelegate())
def disconnect(self):
if not (self.mPeripheral is None):
print("disconnect!!")
self.mPeripheral.disconnect()
self.mPeripheral = None
# get characteristic handle
# uuid --> chracteristic uuid
def getHandle(self, uuid):
if not (self.mPeripheral is None):
descs = self.mPeripheral.getDescriptors()
for desc in descs:
m = re.match(str(desc.uuid), uuid)
if m:
return desc.handle
def writeCharacteristic(self, handle, data):
if not (self.mPeripheral is None):
self.mPeripheral.writeCharacteristic(handle, data, True)
def readCharacteristic(self, uuid):
recv = None
if not (self.mPeripheral is None):
recv = self.mPeripheral.readCharacteristic(handle)
return recv
def waitNotification(self, time=10):
global gBLENotify
gBLENotify = []
counter = time/0.1
while counter:
self.mPeripheral.waitForNotifications(0.1)
if gBLENotify == []:
counter = counter -1
continue;
return gBLENotify[0]
return None
"""sample 2019-4-15
setupBLE()
central = Central()
while True:
devs = central.scan("00000000-0000-0000-0000-000000000000")
if not devs == None:
break
central.connectTo(devs[0])
central.mPeripheral.setDelegate(PeripheralDelegate())
handle = central.getHandle("00000000-0000-0000-0000-000000000001")
svc = central.mPeripheral.getServiceByUUID("00000000-0000-0000-0000-000000000000")
ch = svc.getCharacteristics("00000000-0000-0000-0000-000000000001")[0]
central.mPeripheral.writeCharacteristic(ch.valHandle+1, "\x01\x00")
while True:
if central.mPeripheral.waitForNotifications(1.0):
print("get notif")
continue
print("waiting")
"""
"""
print("scan phase.")
while True:
devs = central.scan("00000000-0000-0000-0000-000000000000")
if not devs == None:
break
print("connect phase.")
central.connectTo(devs[0])
print("notify phase.")
notify = central.waitNotification(10)
if not (notify is None):
print("Notify : " + notify)
#handle = central.getHandle("00000000-0000-0000-0000-000000000001")
#print("write")
#central.writeCharacteristic(handle, b"BodyWeight")
#print("read")
#data = central.readCharacteristic(handle)
#if data is None:
# print("None")
#else:
# print(data)
print("disconnect.")
central.disconnect()
"""