-
Notifications
You must be signed in to change notification settings - Fork 0
/
fusionkf.py
103 lines (78 loc) · 2.99 KB
/
fusionkf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
from kalmanfilter import KalmanFilter
from datapoint import DataPoint
from tools import calculate_jacobian, cartesian_to_polar, time_difference
import numpy as np
class FusionKF:
"""
A class that gets sensor measurements from class DataPoint
and predicts the next state of the system using an extended Kalman filter algorithm
The state variables we are considering in this system are the position and velocity
in x and y cartesian coordinates, in essence there are 4 variables we are tracking.
In particular, an instance of this class gets measurements from both lidar and radar sensors
lidar sensors measure positions in cartesian coordinates (2 values)
radar sensors measure position and velocity in polar coordinates (3 values)
lidar sensor are linear and radar sensors are non-linear, so we use the jacobian algorithm
to compute the state transition matrix H unlike a simple kalman filter.
"""
def __init__(self, d):
self.initialized = False
self.timestamp = 0
self.n = d['number_of_states']
self.P = d['initial_process_matrix']
self.F = d['inital_state_transition_matrix']
self.Q = d['initial_noise_matrix']
self.radar_R = d['radar_covariance_matrix']
self.lidar_R = d['lidar_covariance_matrix']
self.lidar_H = d['lidar_transition_matrix']
self.a = (d['acceleration_noise_x'], d['acceleration_noise_y'])
self.kalmanFilter = KalmanFilter(self.n)
def updateQ(self, dt):
dt2 = dt * dt
dt3 = dt * dt2
dt4 = dt * dt3
x, y = self.a
r11 = dt4 * x / 4
r13 = dt3 * x / 2
r22 = dt4 * y / 4
r24 = dt3 * y / 2
r31 = dt3 * x / 2
r33 = dt2 * x
r42 = dt3 * y / 2
r44 = dt2 * y
Q = np.matrix([[r11, 0, r13, 0],
[0, r22, 0, r24],
[r31, 0, r33, 0],
[0, r42, 0, r44]])
self.kalmanFilter.setQ(Q)
def update(self, data):
dt = time_difference(self.timestamp, data.get_timestamp())
self.timestamp = data.get_timestamp()
self.kalmanFilter.updateF(dt)
self.updateQ(dt)
self.kalmanFilter.predict()
z = np.matrix(data.get_raw()).T
x = self.kalmanFilter.getx()
if data.get_name() == 'radar':
px, py, vx, vy = x[0, 0], x[1, 0], x[2, 0], x[3, 0]
rho, phi, drho = cartesian_to_polar(px, py, vx, vy)
H =np.matrix([[1, 0, 0, 0],
[0, 1, 0, 0],[0,0,1,0]])
Hx = (np.matrix([[rho, phi, drho]])).T
R = self.radar_R
elif data.get_name() == 'lidar':
H = self.lidar_H
Hx = self.lidar_H * x
R = self.lidar_R
self.kalmanFilter.update(z, H, Hx, R)
def start(self, data):
self.timestamp = data.get_timestamp()
x = np.matrix([data.get()]).T
self.kalmanFilter.start(x, self.P, self.F, self.Q)
self.initialized = True
def process(self, data):
if self.initialized:
self.update(data)
else:
self.start(data)
def get(self):
return self.kalmanFilter.getx()