forked from Meta-Team/Meta-Terminal-III
-
Notifications
You must be signed in to change notification settings - Fork 0
/
device.py
112 lines (88 loc) · 2.94 KB
/
device.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import socket
from PyQt5.QtCore import QThread, pyqtSignal
import serial
from time import sleep
# import bluetooth
class Manager_Base(QThread):
device_signal = pyqtSignal(bytes)
connection_signal = pyqtSignal(bool)
def __init__(self, device_name:str):
super(Manager_Base, self).__init__()
self.device_name = device_name
def run(self):
pass
def SendData(self, send_msg):
pass
def stop(self):
pass
meta_serial_baudrate = 115200
class Serial_Manager(Manager_Base):
def __init__(self, device_name:str):
super(Serial_Manager, self).__init__(device_name=device_name)
self.__ser = None
self.alive = False
def run(self):
try:
self.__ser = serial.Serial(port=self.device_name, baudrate=meta_serial_baudrate, timeout=None)
if self.__ser.isOpen():
self.connection_signal.emit(True)
trash = self.__ser.read_all()
self.alive = True
while self.alive and self.__ser.isOpen():
n = self.__ser.inWaiting()
if n:
self.device_signal.emit(self.__ser.read_all())
sleep(0.016)
except:
pass
self.connection_signal.emit(False)
def SendData(self, send_msg):
return self.__ser.write(send_msg)
def stop(self):
self.alive = False
if self.__ser.isOpen():
self.__ser.close()
meta_port = 2021
class Socket_Manager(Manager_Base):
def __init__(self, device_name:str):
super(Socket_Manager, self).__init__(device_name=device_name)
self.__socket = None
self.alive = False
def run(self):
try:
self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.__socket.connect((self.device_name, meta_port))
self.connection_signal.emit(True)
self.alive = True
while self.alive:
data = self.__socket.recv(100)
if len(data) > 0:
self.device_signal.emit(data)
sleep(0.016)
except Exception as e:
print(e)
finally:
self.__socket.close()
self.connection_signal.emit(False)
def SendData(self, send_msg):
return self.__socket.send(send_msg)
def stop(self):
self.alive = False
class Bluetooth_Manager(Manager_Base):
def __init__(self, device_name:str):
super().__init__(device_name)
self.__bt = None
self.alive = False
def run(self):
# self.__bt = bluetooth.discover_devices()
pass
# while self.alive:
# n = self.__bt.inWaiting()
# if n:
def SendData(self, send_msg):
# return self.__bt.write(send_msg)
return None
def stop(self):
self.alive = False
# if self.__bt.isOpen():
# self.__bt.close()