-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathinterface_IWR1443_withGraph.py
221 lines (159 loc) · 5.6 KB
/
interface_IWR1443_withGraph.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
import _thread
import pickle
from threading import Thread, Event
import serial
import numpy as np
import pyqtgraph as pg
from pyqtgraph.Qt import QtGui
import datetime
import os
import time
import warnings
from classes.model_wrapper import NeuralNetwork
from utils.iwr1443_utils import readAndParseData14xx, parseConfigFile
isPredict = False
configFileName = '1443config.cfg'
CLIport = {}
Dataport = {}
"""
# global to hold the average x, y, z of detected points
those are for testing swipe gestures
note that for now, only x and y are used
those values is updated in the function: update
"""
x = []
y = []
z = []
doppler = []
# ------------------------------------------------------------------
# Function to configure the serial ports and send the data from
# the configuration file to the radar
def serialConfig(configFileName):
global CLIport
global Dataport
# Open the serial ports for the configuration and the data ports
# Raspberry pi
# CLIport = serial.Serial('/dev/ttyACM0', 115200)
# Dataport = serial.Serial('/dev/ttyACM1', 921600)
# For WINDOWS, CHANGE those serial port to match your machine's configuration
CLIport = serial.Serial('COM5', 115200)
Dataport = serial.Serial('COM4', 921600)
# Read the configuration file and send it to the board
config = [line.rstrip('\r\n') for line in open(configFileName)]
for i in config:
CLIport.write((i + '\n').encode())
print(i)
time.sleep(0.01)
return CLIport, Dataport
# ------------------------------------------------------------------
# ------------------------------------------------------------------
# Funtion to update the data and display in the plot
def update():
dataOk = 0
global detObj
x = []
y = []
z = []
doppler = []
# Read and parse the received data
dataOk, frameNumber, detObj = readAndParseData14xx(Dataport, configParameters)
if dataOk:
# print(detObj)
x = -detObj["x"]
y = detObj["y"]
z = detObj["z"]
doppler = detObj["doppler"] # doppler values for the detected points in m/s
draw_x_y.setData(x, y)
draw_z_v.setData(z, doppler)
QtGui.QApplication.processEvents()
return dataOk
# ------------------------- MAIN -----------------------------------------
today = datetime.datetime.now()
today = datetime.datetime.now()
root_dn = 'data/f_data-' + str(today).replace(':', '-').replace(' ', '_')
warnings.simplefilter('ignore', np.RankWarning)
# Configurate the serial port
CLIport, Dataport = serialConfig(configFileName)
# Get the configuration parameters from the configuration file
configParameters = parseConfigFile(configFileName)
# START QtAPPfor the plot
app = QtGui.QApplication([])
# Set the plot
pg.setConfigOption('background', 'w')
win = pg.GraphicsWindow(title="2D scatter plot")
fig_z_y = win.addPlot()
fig_z_y.setXRange(-0.5, 0.5)
fig_z_y.setYRange(0, 1.5)
fig_z_y.setLabel('left', text='Y position (m)')
fig_z_y.setLabel('bottom', text='X position (m)')
draw_x_y = fig_z_y.plot([], [], pen=None, symbol='o')
# set the processed plot
fig_z_v = win.addPlot()
fig_z_v.setXRange(-1, 1)
fig_z_v.setYRange(-1, 1)
fig_z_v.setLabel('left', text='Z position (m)')
fig_z_v.setLabel('bottom', text='Doppler (m/s)')
draw_z_v = fig_z_v.plot([], [], pen=None, symbol='o')
# Main loop
detObj = {}
frameData = {}
preprocessed_frameArray = []
# reading RNN model
if isPredict:
# regressive_classifier = NeuralNetwork()
# regressive_classifier.load(file_name='trained_models/radar_model/072319_02/regressive_classifier.h5')
onNotOn_ann_classifier = NeuralNetwork()
onNotOn_ann_classifier.load(file_name='F:/config_detection/models/onNotOn_ANN/classifier_080919_2.h5')
onNotOn_encoder = pickle.load(open('F:/config_detection/models/onNotOn_ANN/encoder_080919_2', 'rb'))
rnn_timestep = 100
num_padding = 50
def input_thread(a_list):
input()
interrupt_list.append(True)
class PredicationThread(Thread):
def __init__(self, event):
Thread.__init__(self)
self.stopped = event
def run(self):
while not self.stopped.wait(0.5):
pass
input("Press Enter to start!...")
# create the interrupt thread
interrupt_list = []
_thread.start_new_thread(input_thread, (interrupt_list,))
# start the prediction thread
main_stop_event = Event()
if isPredict:
thread = PredicationThread(main_stop_event)
thread.start()
start_time = time.time()
while True:
try:
# Update the data and check if the data is okay
dataOk = update()
if dataOk:
# Store the current frame into frameData
frameData[time.time()] = detObj
# frameRow = np.asarray([detObj['x'], detObj['y'], detObj['z'], detObj['doppler']]).transpose()
# preprocessed_frameArray.append(preprocess_frame(frameRow))
time.sleep(0.033) # This is framing frequency Sampling frequency of 30 Hz
if interrupt_list:
raise KeyboardInterrupt()
# Stop the program and close everything if Ctrl + c is pressed
except KeyboardInterrupt:
CLIport.write(('sensorStop\n').encode())
CLIport.close()
Dataport.close()
win.close()
# stop prediction thread
main_stop_event.set()
# save radar frame data
is_save = input('do you wish to save the recorded frames? [y/n]')
if is_save == 'y':
os.mkdir(root_dn)
file_path = os.path.join(root_dn, 'f_data.p')
with open(file_path, 'wb') as pickle_file:
pickle.dump(frameData, pickle_file)
else:
print('exit without saving')
break