-
Notifications
You must be signed in to change notification settings - Fork 103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: GPS Simulator #45
base: master
Are you sure you want to change the base?
Changes from 8 commits
a7a2456
f013966
4eead5a
9402309
9b6f1da
7f697cf
6d7fdb2
3050a3a
49dfae1
d6d715c
cdd2aea
e60096e
a4254e4
23e9efc
aca0fdc
2d818b5
f99f7c5
97f04ad
0d4006b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
cmake_minimum_required(VERSION 2.8.3) | ||
project(gps_sim) | ||
|
||
find_package(catkin_simple REQUIRED) | ||
|
||
catkin_simple() | ||
cs_install() | ||
cs_export() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
<?xml version="1.0"?> | ||
<package format="2"> | ||
<name>gps_sim</name> | ||
<version>1.0.0</version> | ||
<description>More realistic GPS simulation.</description> | ||
|
||
<maintainer email="[email protected]">Michael Pantic</maintainer> | ||
<license>BSD</license> | ||
<buildtool_depend>catkin</buildtool_depend> | ||
<buildtool_depend>catkin_simple</buildtool_depend> | ||
<depend>geometric_msgs</depend> | ||
<depend>nav_msgs</depend> | ||
|
||
</package> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
import colorednoise as cn | ||
import numpy as np | ||
|
||
from matplotlib import mlab | ||
import matplotlib.pyplot as plt | ||
|
||
|
||
class GpsNoiser: | ||
|
||
# multiplier for each of the noise types, each of the axes | ||
def __init__(self, white=np.array([0.1, 0.1, 0.1]), | ||
pink=np.array([0.05, 0.05, 0.6]), | ||
brown=np.array([0.5, 0.5, 0.1]), epsilons=np.array([0, 1, 2.5])): | ||
self._white = white | ||
self._pink = pink | ||
self._brown = brown | ||
self._sample_buffer_length = 7200 # [s] | ||
self._sampling_freq = 10 # [hz] | ||
# order: white, pink, brown | ||
self._sample_buffer = np.zeros([self._sample_buffer_length * self._sampling_freq, 9]) | ||
self._sample_buffer_pos = 0 | ||
self._epsilons = epsilons | ||
|
||
def sample_noise(self): | ||
# make some noise! | ||
n = self._sample_buffer_length * self._sampling_freq | ||
for i in range(0, 3): | ||
self._sample_buffer[:, i] = cn.powerlaw_psd_gaussian(self._epsilons[0], n) # white | ||
self._sample_buffer[:, i + 3] = cn.powerlaw_psd_gaussian(self._epsilons[1], n) # pink | ||
self._sample_buffer[:, i + 6] = cn.powerlaw_psd_gaussian(self._epsilons[2], n, 0.001) # brown | ||
self._sample_buffer_pos = 0 | ||
|
||
def perturb(self, pos_enu): | ||
if self._sample_buffer_pos >= self._sample_buffer_length * self._sampling_freq: | ||
print("Run out of sampling buffer") | ||
# redo -> might cause discontinuity in noise, should be avoided | ||
self.sample_noise() | ||
|
||
i = self._sample_buffer_pos | ||
self._sample_buffer_pos += 1 | ||
|
||
return pos_enu + np.multiply(self._white, self._sample_buffer[i, 0:3]) + \ | ||
np.multiply(self._pink, self._sample_buffer[i, 3:6]) + \ | ||
np.multiply(self._brown, self._sample_buffer[i, 6:9]) | ||
|
||
|
||
if __name__ == "__main__": | ||
# RTK: | ||
# a = GpsNoiser(white=np.array([0.0005, 0.0005, 0.002]), | ||
# pink=np.array([0.0035, 0.0035, 0.007]), | ||
# brown=np.array([0.001, 0.001, 0.002]), | ||
# epsilons=np.array([0, 1, 2])) | ||
|
||
# SPP: | ||
a = GpsNoiser(white=np.array([0.0075, 0.0075, 0.015]), | ||
pink=np.array([0.001, 0.001, 0.002]), | ||
brown=np.array([0.03, 0.03, 0.06]), | ||
epsilons=np.array([0, 1, 3.0])) | ||
|
||
a.sample_noise() | ||
|
||
# load true rtk data. | ||
name = "FLOAT" | ||
rtk_data = np.load("/home/mpantic/Work/ARMA/GPS/bags/bag_sim_gps/spp_2021-02-18-16-31-50.bag_np.npy") | ||
# rtk_data = np.load("/home/mpantic/Work/ARMA/GPS/bags/bag_sim_gps/rtk_2021-02-18-16-21-18.bag_np.npy") | ||
rtk_data -= rtk_data.mean(axis=0) | ||
|
||
# generate fake data of same length | ||
sim_data = np.zeros(rtk_data.shape) | ||
for i in range(0, len(rtk_data)): | ||
sim_data[i, :] = a.perturb(np.array([0.0, 0.0, 0.0])) | ||
|
||
|
||
print("empirical covariance") | ||
print(np.cov(sim_data.T)) | ||
|
||
exit(0) | ||
|
||
wa = 2 # working axis | ||
s_true, f_true = mlab.psd(rtk_data[:, wa], NFFT=2 ** 12) | ||
s_sim, f_sim = mlab.psd(sim_data[:, wa], NFFT=2 ** 12) | ||
|
||
plt.figure(figsize=[6, 6]) | ||
plt.loglog(f_true, s_true, 'r', alpha=0.25) | ||
plt.loglog(f_sim, s_sim, 'b', alpha=0.25) | ||
plt.xlim([10 ** -3.5, 1]) | ||
plt.ylim([10 ** -9, 10 ** 3]) | ||
plt.title("Noise Power Spectrum " + name + " (blue=sim/red=real data)") | ||
plt.xlabel("Freq") | ||
plt.ylabel("Amp") | ||
plt.grid(True) | ||
plt.savefig("power_spec_" + name + ".png") | ||
|
||
plt.figure(figsize=[6, 6]) | ||
plt.scatter(rtk_data[:, 0], rtk_data[:, 2], c=np.arange(len(rtk_data)), marker="x") | ||
plt.title("Bias Free " + name + " ENU Position (REAL DATA)") | ||
plt.xlabel("x_enu [m]") | ||
plt.ylabel("y_enu [m]") | ||
plt.grid(True) | ||
plt.savefig("enu_xy_" + name + "_real.png") | ||
|
||
plt.figure(figsize=[6, 6]) | ||
plt.scatter(sim_data[:, 0], sim_data[:, 2], c=np.arange(len(sim_data)), marker="x") | ||
plt.title("Bias Free " + name + " ENU Position (SIM DATA)") | ||
plt.xlabel("x_enu [m]") | ||
plt.ylabel("y_enu [m]") | ||
plt.grid(True) | ||
plt.savefig("enu_xy_" + name + "_sim.png") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
import numpy as np | ||
from GpsNoiser import GpsNoiser | ||
from Magnetometer import Magnetometer | ||
|
||
|
||
class GpsSimulator: | ||
|
||
def __init__(self): | ||
self._rtk_noiser = GpsNoiser(white=np.array([0.0005, 0.0005, 0.002]), | ||
pink=np.array([0.0035, 0.0035, 0.007]), | ||
brown=np.array([0.001, 0.001, 0.002]), | ||
epsilons=np.array([0, 1, 2])) | ||
self._rtk_noiser.sample_noise() | ||
|
||
self._float_noiser = GpsNoiser(white=np.array([0.0075, 0.0075, 0.015]), | ||
pink=np.array([0.001, 0.001, 0.002]), | ||
brown=np.array([0.03, 0.03, 0.06]), | ||
epsilons=np.array([0, 1, 3.0])) | ||
self._float_noiser.sample_noise() | ||
|
||
self._spp_noiser = GpsNoiser(white=np.array([0.001, 0.001, 0.002]), | ||
pink=np.array([0.0, 0.0, 0.0]), | ||
brown=np.array([0.50, 0.5, 1.0]), | ||
epsilons=np.array([0, 1, 3.0])) | ||
self._spp_noiser.sample_noise() | ||
|
||
self._spp_output_cov = np.array([[1.15363866, 0.00769917, -0.02256928], | ||
[0.00769917, 1.16177632, -0.03191735], | ||
[-0.02256928, -0.03191735, 5.23445582]]) | ||
|
||
# empirical covariances from imagined model not based on data | ||
# -> to be based on data at a later point. | ||
self._float_output_cov = np.array([[8.17408501e-04, 7.13988338e-05, 1.47721410e-04], | ||
[7.13988338e-05, 1.01484400e-03, -1.63778765e-04], | ||
[1.47721410e-04, -1.63778765e-04, 2.07055086e-03]]) | ||
|
||
self._rtk_output_cov = np.array([[9.47897207e-05, 1.92104152e-05, -6.46848160e-05], | ||
[1.92104152e-05, 9.39087989e-05, -7.25870764e-05], | ||
[-6.46848160e-05, -7.25870764e-05, 4.24079132e-04]]) | ||
|
||
self._mag_noiser = Magnetometer() | ||
|
||
# can be: auto, none, spp, float, rtk | ||
self._current_mode = "spp" | ||
|
||
self._rtk_polygon = [] | ||
self._spp_polygon = [] | ||
self._float_polygon = [] | ||
self._none_polygon = [] | ||
|
||
|
||
def set_mode(self, mode): | ||
self._current_mode = mode | ||
|
||
def get_mag(self, input_rot): | ||
return self._mag_noiser.get_field(input_rot) | ||
|
||
def get_gps(self, input_enu, fixtype): | ||
if fixtype is "none": | ||
# *screaming" lost fix!! | ||
return None, None | ||
|
||
output_enu = np.zeros([0, 0, 0]) | ||
output_cov = np.eye(3) | ||
|
||
if fixtype is "rtk": | ||
output_enu = self._rtk_noiser.perturb(input_enu) | ||
output_cov = self._rtk_output_cov | ||
|
||
elif fixtype is "float": | ||
output_enu = self._float_noiser.perturb(input_enu) | ||
output_cov = self._float_output_cov | ||
|
||
elif fixtype is "spp": | ||
output_enu = self._spp_noiser.perturb(input_enu) | ||
output_cov = self._spp_output_cov | ||
|
||
return output_enu, output_cov | ||
|
||
def get_fixtype(self, input_enu): | ||
return "rtk" # for now | ||
|
||
def simulate(self, input_pose): | ||
input_enu = input_pose[0:3, 3] | ||
|
||
# output GPS | ||
if self._current_mode is "auto": | ||
fixtype = self.get_fixtype(input_enu) | ||
output_enu, output_cov = self.get_gps(input_enu, fixtype) | ||
else: | ||
output_enu, output_cov = self.get_gps(input_enu, self._current_mode) | ||
|
||
output_mag = self.get_mag(input_pose[0:3, 0:3]) | ||
|
||
# weird pythonic flex that these variables are found | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lol yeah... |
||
return output_enu, output_cov, output_mag |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
import geomag as gm | ||
import numpy as np | ||
|
||
import matplotlib | ||
import matplotlib.pyplot as plt | ||
|
||
|
||
class Magnetometer: | ||
def __init__(self, noise_cov=500): | ||
# set initial field strength | ||
magnetic_field = gm.geomag.GeoMag() | ||
magnetic_data = magnetic_field.GeoMag(47.37, 8.53) | ||
self._field_vector = np.array([magnetic_data.bx, magnetic_data.by, magnetic_data.bz]) | ||
# d = | ||
self._noise_cov = noise_cov | ||
self._declination = magnetic_data.dec | ||
|
||
# takes current orientation and rotates magnetic field according to this | ||
# currently does not include non-linearities in calibration | ||
def get_field(self, orientation=np.eye(3)): | ||
noised_vector = self._field_vector + np.random.normal([0, 0, 0], | ||
[self._noise_cov, self._noise_cov, self._noise_cov]) | ||
return orientation.dot(noised_vector) | ||
|
||
def get_declination(self): | ||
return self._declination | ||
|
||
|
||
if __name__ == "__main__": | ||
a = Magnetometer() | ||
magnetic_field = np.zeros([1000, 3]) | ||
resulting_heading = np.zeros([1000, 1]) | ||
for i in range(0, 1000): | ||
magnetic_field[i, :] = a.get_field() | ||
resulting_heading[i] = np.arctan2(magnetic_field[i, 1], magnetic_field[i, 0]) * (180 / 3.1415) | ||
|
||
fig, ax = plt.subplots() | ||
# ax.plot(magnetic_field[0:1000,0],'r') | ||
# ax.plot(magnetic_field[0:1000, 1], 'g') | ||
# ax.plot(magnetic_field[0:1000, 2], 'b') | ||
ax.plot(resulting_heading, 'r') | ||
|
||
ax.grid() | ||
plt.show() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
#!/usr/bin/env python | ||
import rospy | ||
import numpy as np | ||
from std_msgs.msg import String | ||
from nav_msgs.msg import Odometry | ||
from sensor_msgs.msg import MagneticField | ||
from GpsSimulator import GpsSimulator | ||
from tf import transformations | ||
|
||
class GpsSimNode: | ||
|
||
def __init__(self): | ||
rospy.init_node('gps_sim_node', anonymous=True) | ||
|
||
self.odom_pub = rospy.Publisher('gps_out', Odometry, queue_size=10) | ||
self.mag_pub = rospy.Publisher('mag_out', MagneticField, queue_size=10) | ||
|
||
self.gps_sim = GpsSimulator() | ||
rospy.Subscriber("/HeadHelmety/vrpn_client/estimated_odometry", Odometry, self.odom_callback) | ||
|
||
def odom_callback(self, data): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we throttle the rate anywhere? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yep now it does |
||
# extract pose | ||
input_pos = np.array([data.pose.pose.position.x, data.pose.pose.position.y, data.pose.pose.position.z]) | ||
input_pose = transformations.quaternion_matrix(np.array([ | ||
data.pose.pose.orientation.x, | ||
data.pose.pose.orientation.y, | ||
data.pose.pose.orientation.z, | ||
data.pose.pose.orientation.w | ||
])) | ||
|
||
# assmeble 4x4 matrix | ||
input_pose[0:3,3] = input_pos | ||
input_stamp = data.header.stamp | ||
|
||
enu_pos, enu_cov, mag_data = self.gps_sim.simulate(input_pose) | ||
|
||
# assemble messages. | ||
mag_msg = MagneticField() | ||
mag_msg.header.stamp = input_stamp | ||
mag_msg.magnetic_field.x = mag_data[0] | ||
mag_msg.magnetic_field.y = mag_data[1] | ||
mag_msg.magnetic_field.z = mag_data[2] | ||
|
||
print(np.arctan2(mag_data[1], mag_data[0]) * (180 / 3.1415)) | ||
self.mag_pub.publish(mag_msg) | ||
|
||
if enu_pos is None or enu_cov is None: | ||
# no gps data :( | ||
return | ||
|
||
odom_msg = Odometry() | ||
odom_msg.header.stamp = input_stamp | ||
odom_msg.pose.pose.position.x = enu_pos[0] | ||
odom_msg.pose.pose.position.y = enu_pos[1] | ||
odom_msg.pose.pose.position.z = enu_pos[2] | ||
odom_msg.pose.covariance[0] = enu_cov[0, 0] | ||
odom_msg.pose.covariance[1] = enu_cov[0, 1] | ||
odom_msg.pose.covariance[2] = enu_cov[0, 2] | ||
odom_msg.pose.covariance[6] = enu_cov[1, 0] | ||
odom_msg.pose.covariance[7] = enu_cov[1, 1] | ||
odom_msg.pose.covariance[8] = enu_cov[1, 2] | ||
odom_msg.pose.covariance[12] = enu_cov[2, 0] | ||
odom_msg.pose.covariance[13] = enu_cov[2, 1] | ||
odom_msg.pose.covariance[14] = enu_cov[2, 2] | ||
self.odom_pub.publish(odom_msg) | ||
|
||
|
||
if __name__ == '__main__': | ||
node = GpsSimNode() | ||
rospy.spin() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's this last number here? Might be worth naming this constant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or use the argument name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍