-
Notifications
You must be signed in to change notification settings - Fork 1
/
radar_points.py
244 lines (217 loc) · 9.13 KB
/
radar_points.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
import numpy as np
class RadarFrame:
def __init__(self, data: "list[dict[str, int or float]]", isTLVframe=False):
"""
Radar frame object contains data for a defined frame interval in lists for each attribute
param data: a list of dictionaries
ex. [{
'sensorId': 2,
'x': -280.35359191052436,
'y': 524.516705459526,
'z': 875.3924645059872,
'timestamp': 1663959822542,
'isStatic: 0
}, ...]
"""
self.isTLVframe = isTLVframe
self.sid = []
self.x = []
self.y = []
self.z = []
self.ts = []
self.is_static = [] # -1 default, 1 static, 0 not static.
self.tlvs = []
for item in data:
self.sid.append(item["sensorId"])
self.x.append(item["x"])
self.y.append(item["y"])
self.z.append(item["z"])
self.ts.append(item["timestamp"])
self.is_static.append(-1) # update in main program with static points class
if self.isTLVframe:
self.tlvs.append(item["TLV_type"])
def __repr__(self):
class_str = f"RadarFrame object with {len(self.sid)} points."
if len(self.sid) > 0:
class_str += f" Sensor id: {set(self.sid)}, starting ts: {self.ts[0]}, ending ts: {self.ts[-1]}"
return class_str
# check if a specified sensor is empty
def is_empty(self, target_sensor_id=None) -> bool:
# if sensor id is not passed in, check all sensors
if target_sensor_id is None:
return len(self.sid) == 0
else:
# if argument specifies sensor id, check data within that sensor id only
return not any(id == target_sensor_id for id in self.sid)
# getter for points list in format to be used for display
def get_points_for_display(self, sensor_id) -> list:
points_list = []
for i, id in enumerate(self.sid):
if id == sensor_id:
if self.isTLVframe:
points_list.append((self.x[i], self.y[i], self.z[i], self.is_static[i], self.tlvs[i]))
else:
# 0 to indicate no TLV type
points_list.append((self.x[i], self.y[i], self.z[i], self.is_static[i], 0))
return points_list
# TODO: points_for_clustering not working as expected, each radar frame contains points for only 1 sensor at a
# time. reformat how its used in main or just delete bc combining points for display works well.
def points_for_clustering(self) -> list:
points_list = []
for i, status in enumerate(self.is_static):
if status == 0: # if point is not static
points_list.append((self.x[i], self.y[i], self.z[i])) # for actual z value
# points_list.append((self.x[i], self.y[i], 0)) # flatten z value
print(self.sid)
return points_list
def get_xyz_coord(self, sensor_id) -> list:
points_list = []
for i, id in enumerate(self.sid):
if id == sensor_id:
points_list.append((self.x[i], self.y[i], self.z[i]))
return points_list
def set_static_points(self, points_list: list) -> None:
"""find a match of (x,y) to self.x and self.y lists and update is_static"""
if len(points_list) > 0:
assert len(points_list[0]) == 3, "points_list contains tuples of (x,y,z)"
for i, (x, y, z) in enumerate(zip(self.x, self.y, self.z)):
if (x, y, z) in points_list:
self.is_static[i] = 1
else:
self.is_static[i] = 0
class RadarData:
def __init__(self, data: "list[dict[str, int or float]]", isTLVformat=False):
"""
Radar data object: contains all data from radar sensors in lists for each attribute.
Updated when frames are processed by take_next_frame()
param data: a list of dictionaries
ex. [{
'sid': 1,
'x': 85.43406302787685,
'y': 2069.789390083478,
'z': 1473.3243136313272,
'ts': 1663959820484
}, ...]
"""
self.sid = []
self.x = []
self.y = []
self.z = []
self.ts = []
self.tlvs = []
self.isTransformed = False
self.isTLVformat = isTLVformat
for item in data:
self.sid.append(item["sensorId"])
self.x.append(item["x"])
self.y.append(item["y"])
self.z.append(item["z"])
self.ts.append(item["timestamp"])
if self.isTLVformat:
self.tlvs.append(item["TLV_type"])
self.__time_elapsed = 0
self.__initial_timestamp = None
def __repr__(self):
class_str = f"RadarData object: {self.get_num_sensors()} sensors. "
class_str += f"{len(self.x)} points, "
class_str += f"and {0 if not self.ts else self.ts[-1] - self.ts[0]}ms of data."
return class_str
def set_initial_timestamp(self) -> None:
if self.__initial_timestamp is None:
self.__initial_timestamp = self.ts[0]
def get_num_sensors(self) -> int:
has_sensor_1 = 1 in self.sid
has_sensor_2 = 2 in self.sid
if has_sensor_1 and not has_sensor_2:
return 1
elif has_sensor_2 and not has_sensor_1:
return 1
elif has_sensor_1 and has_sensor_2:
return 2
else:
return 0
def has_data(self) -> bool:
return len(self.x) > 0
def transform_coord(self, s1_rotz, s1_rotx, s2_rotz, s2_rotx,
offsetx, offsety, offsetz):
"""Apply coordinate transformation."""
if self.isTransformed:
print("Warning: RadarData already transformed. No action taken.")
return
for i in range(len(self.x)):
xyz = np.asmatrix(([self.x[i]], [self.y[i]], [self.z[i]])) # cm to mm
if self.sid[i] == 1:
# entry sensor
xyz_transformed = np.matmul(s1_rotz, np.matmul(s1_rotx, xyz))
xyz_transformed += np.array([[offsetx], [-offsety], [offsetz]])
elif self.sid[i] == 2:
xyz_transformed = np.matmul(s2_rotz, np.matmul(s2_rotx, xyz))
xyz_transformed += np.array([[-offsetx], [offsety], [offsetz]])
else:
print("RadarData: Sensor ID not supported")
raise
self.x[i] = float(xyz_transformed[0])
self.y[i] = float(xyz_transformed[1])
self.z[i] = float(xyz_transformed[2])
self.isTransformed = True
# returns radar frame object for a specified interval
def take_next_frame(self, interval: int, isTLVframe=False) -> RadarFrame:
self.set_initial_timestamp() # very first timestamp in data
frame_last_ts = self.__initial_timestamp + self.__time_elapsed + interval
self.__time_elapsed += interval
frame_last_ts_index = None
for i, ts in enumerate(self.ts):
if ts <= frame_last_ts:
frame_last_ts_index = i + 1
else:
break
if frame_last_ts_index is None:
return RadarFrame([])
extracted_data = []
for i in range(frame_last_ts_index):
extracted_data.append(
{
"sensorId": self.sid[i],
"x": self.x[i],
"y": self.y[i],
"z": self.z[i],
"timestamp": self.ts[i],
}
)
if isTLVframe:
extracted_data[-1]["TLV_type"] = self.tlvs[i]
del self.sid[:frame_last_ts_index]
del self.x[:frame_last_ts_index]
del self.y[:frame_last_ts_index]
del self.z[:frame_last_ts_index]
del self.ts[:frame_last_ts_index]
if isTLVframe:
del self.tlvs[:frame_last_ts_index]
return RadarFrame(extracted_data, isTLVframe)
class StaticPoints:
def __init__(self, cnt_thres=10):
self.static_points = []
self.static_points_count = []
self.cnt_max = 100 # max count for a point
self.cnt_thres = cnt_thres # threshold for a point to be considered static
def update(self, frame):
"""frame is a list of points in tuple, e.g. [(x1,y1),(x2,y2),...]"""
# remove points that are not in frame
for i in range(len(self.static_points) - 1, -1, -1):
if self.static_points[i] not in frame:
self.static_points.pop(i)
self.static_points_count.pop(i)
# add new points to static_points
for point in frame:
if point in self.static_points:
self.static_points_count[self.static_points.index(point)] += 1
else:
self.static_points.append(point)
self.static_points_count.append(1)
def get_static_points(self):
# return a list of static points
return [
self.static_points[i]
for i in range(len(self.static_points))
if self.static_points_count[i] > self.cnt_thres
]