diff --git a/rb_ws/src/buggy/launch/main.launch b/rb_ws/src/buggy/launch/main.launch
old mode 100644
new mode 100755
diff --git a/rb_ws/src/buggy/launch/sim_2d_2buggies.launch b/rb_ws/src/buggy/launch/sim_2d_2buggies.launch
old mode 100644
new mode 100755
index 4f78a3d9..2cda83d1
--- a/rb_ws/src/buggy/launch/sim_2d_2buggies.launch
+++ b/rb_ws/src/buggy/launch/sim_2d_2buggies.launch
@@ -1,40 +1,48 @@
-
-
-
-
-
-
-
-
+
-
+
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
diff --git a/rb_ws/src/buggy/launch/sim_2d_single.launch b/rb_ws/src/buggy/launch/sim_2d_single.launch
old mode 100644
new mode 100755
index 75cc97c3..1f3e4974
--- a/rb_ws/src/buggy/launch/sim_2d_single.launch
+++ b/rb_ws/src/buggy/launch/sim_2d_single.launch
@@ -1,23 +1,30 @@
-
-
-
-
-
-
-
-
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
+
+
\ No newline at end of file
diff --git a/rb_ws/src/buggy/scripts/2d_sim/engine.py b/rb_ws/src/buggy/scripts/2d_sim/engine.py
index 3d48bd34..d0adb4e2 100755
--- a/rb_ws/src/buggy/scripts/2d_sim/engine.py
+++ b/rb_ws/src/buggy/scripts/2d_sim/engine.py
@@ -1,13 +1,15 @@
#! /usr/bin/env python3
+
import sys
import threading
+
+import rospy
from geometry_msgs.msg import Pose, Twist, PoseWithCovariance, TwistWithCovariance
from std_msgs.msg import Float64
from sensor_msgs.msg import NavSatFix
from nav_msgs.msg import Odometry
import numpy as np
import utm
-import rospy
class Simulator:
@@ -44,14 +46,16 @@ def __init__(self, starting_pose, velocity, buggy_name):
buggy_name + "/state/pose_navsat", NavSatFix, queue_size=1
)
- # to plot on Foxglove (with noise)
- self.navsatfix_noisy_publisher = rospy.Publisher(
- buggy_name + "/state/pose_navsat_noisy", NavSatFix, queue_size=1
- )
+ if Simulator.NOISE:
+ # to plot on Foxglove (with noise)
+ self.navsatfix_noisy_publisher = rospy.Publisher(
+ buggy_name + "/state/pose_navsat_noisy", NavSatFix, queue_size=1
+ )
+
# (UTM east, UTM north, HEADING(degs))
self.starting_poses = {
- "Hill1_SC": (Simulator.UTM_EAST_ZERO + 60, Simulator.UTM_NORTH_ZERO + 150, -110),
- "Hill1_NAND": (Simulator.UTM_EAST_ZERO + 55, Simulator.UTM_NORTH_ZERO + 165, -125),
+ "Hill1_NAND": (Simulator.UTM_EAST_ZERO + 0, Simulator.UTM_NORTH_ZERO + 0, -110),
+ "Hill1_SC": (Simulator.UTM_EAST_ZERO + 20, Simulator.UTM_NORTH_ZERO + 30, -110),
}
# Start position for End of Hill 2
@@ -70,8 +74,8 @@ def __init__(self, starting_pose, velocity, buggy_name):
self.velocity = velocity # m/s
self.steering_angle = 0 # degrees
- self.rate = 100 # Hz
- self.pub_skip = 10 # publish every pub_skip ticks
+ self.rate = 200 # Hz
+ self.pub_skip = 1 # publish every pub_skip ticks
self.lock = threading.Lock()
diff --git a/rb_ws/src/buggy/scripts/2d_sim/velocity_ui.py b/rb_ws/src/buggy/scripts/2d_sim/velocity_ui.py
old mode 100644
new mode 100755
index 8a2061c4..09840e3a
--- a/rb_ws/src/buggy/scripts/2d_sim/velocity_ui.py
+++ b/rb_ws/src/buggy/scripts/2d_sim/velocity_ui.py
@@ -13,7 +13,7 @@ def __init__(self, init_vel: float, buggy_name: str):
self.root = tk.Tk()
- self.root.title('Manual Velocity')
+ self.root.title(buggy_name + ' Manual Velocity: scale = 0.1m/s')
self.root.geometry('600x100')
self.root.configure(background = '#342d66')
@@ -24,7 +24,7 @@ def __init__(self, init_vel: float, buggy_name: str):
self.root.bind("", lambda d: self.scale.set(self.scale.get() - 2))
def step(self):
- """sets velocity of buggy to the current scale value
+ """sets velocity of buggy to the current scale value
called once every tick
"""
self.root.update_idletasks()
diff --git a/rb_ws/src/buggy/scripts/auton/autonsystem.py b/rb_ws/src/buggy/scripts/auton/autonsystem.py
index 8b50232d..d40d509c 100755
--- a/rb_ws/src/buggy/scripts/auton/autonsystem.py
+++ b/rb_ws/src/buggy/scripts/auton/autonsystem.py
@@ -1,27 +1,28 @@
#!/usr/bin/env python3
-import sys
+import argparse
+import cProfile
from threading import Lock
-import numpy as np
import rospy
# ROS Message Imports
from std_msgs.msg import Float32, Float64, Bool
from nav_msgs.msg import Odometry
+import numpy as np
+
from trajectory import Trajectory
from world import World
from controller import Controller
from pure_pursuit_controller import PurePursuitController
from stanley_controller import StanleyController
-from model_predictive_controller import ModelPredictiveController
from brake_controller import BrakeController
-# from model_predictive_controller import ModelPredictiveController
-from model_predictive_interpolation import ModelPredictiveController
+from model_predictive_controller import ModelPredictiveController
+# from model_predictive_interpolation import ModelPredictiveController
+from path_planner import PathPlanner
from pose import Pose
-
class AutonSystem:
"""
Top-level class for the RoboBuggy autonomous system
@@ -29,153 +30,256 @@ class AutonSystem:
On every tick, this class will read the current state of the buggy,
compare it to the reference trajectory, and use a given controller
to compute the desired control output.
+
+ On every 10th tick, this class will generate a new reference trajectory
+ based on the updated position of the other buggy.
"""
- trajectory: Trajectory = None
- controller: Controller = None
+ global_trajectory: Trajectory = None
+ local_controller: Controller = None
brake_controller: BrakeController = None
lock = None
steer_publisher = None
ticks = 0
- def __init__(self, trajectory, controller, brake_controller, buggy_name, is_sim) -> None:
- self.trajectory = trajectory
- self.controller = controller
+ def __init__(self,
+ global_trajectory,
+ local_controller,
+ brake_controller,
+ self_name,
+ other_name,
+ profile) -> None:
+
+
+ self.global_trajectory = global_trajectory
+
+ # local trajectory is initialized as global trajectory. If there is no other buggy,
+ # the local trajectory is never updated.
+
+ self.has_other_buggy = not other_name is None
+ self.cur_traj = global_trajectory
+ self.local_controller = local_controller
self.brake_controller = brake_controller
+
+ self.path_planner = PathPlanner(global_trajectory)
+ self.other_steering = 0
+
self.lock = Lock()
self.ticks = 0
- self.msg = None
- if (is_sim):
- rospy.Subscriber(buggy_name + "/nav/odom", Odometry, self.update_msg)
+ self.self_odom_msg = None
+ self.other_odom_msg = None
- rospy.Subscriber(buggy_name + "nav/odom", Odometry, self.update_msg)
+ rospy.Subscriber(self_name + "/nav/odom", Odometry, self.update_self_odom)
+ if self.has_other_buggy:
+ rospy.Subscriber(other_name + "/nav/odom", Odometry, self.update_other_odom)
+ self.other_steer_subscriber = rospy.Subscriber(
+ other_name + "/input/steering", Float64, self.update_other_steering_angle
+ )
+
+ rospy.Subscriber(self_name + "nav/odom", Odometry, self.update_self_odom)
self.covariance_warning_publisher = rospy.Publisher(
- buggy_name + "/debug/is_high_covariance", Bool, queue_size=1
+ self_name + "/debug/is_high_covariance", Bool, queue_size=1
)
self.steer_publisher = rospy.Publisher(
- buggy_name + "/input/steering", Float64, queue_size=1
+ self_name + "/input/steering", Float64, queue_size=1
)
self.brake_publisher = rospy.Publisher(
- buggy_name + "/input/brake", Float64, queue_size=1
+ self_name + "/input/brake", Float64, queue_size=1
)
self.brake_debug_publisher = rospy.Publisher(
- buggy_name + "/auton/debug/brake", Float64, queue_size=1
+ self_name + "/auton/debug/brake", Float64, queue_size=1
)
self.heading_publisher = rospy.Publisher(
- buggy_name + "/auton/debug/heading", Float32, queue_size=1
+ self_name + "/auton/debug/heading", Float32, queue_size=1
)
self.distance_publisher = rospy.Publisher(
- buggy_name + "/auton/debug/distance", Float64, queue_size=1
+ self_name + "/auton/debug/distance", Float64, queue_size=1
)
+
self.auton_rate = 100
self.rosrate = rospy.Rate(self.auton_rate)
+ self.profile = profile
self.tick_caller()
- def update_speed(self, msg):
+ def update_self_odom(self, msg):
with self.lock:
- self.speed = msg.data
+ self.self_odom_msg = msg
- def update_msg(self, msg):
+ def update_other_odom(self, msg):
with self.lock:
- self.msg = msg
+ self.other_odom_msg = msg
+
+ def update_other_steering_angle(self, msg):
+ with self.lock:
+ self.other_steering = msg.data
def tick_caller(self):
- while ((not rospy.is_shutdown()) and (self.msg == None)): # with no message, we wait
+ while ((not rospy.is_shutdown()) and
+ (self.self_odom_msg == None or
+ (self.has_other_buggy and self.other_odom_msg == None))): # with no message, we wait
rospy.sleep(0.001)
# wait for covariance matrix to be better
while ((not rospy.is_shutdown()) and
- (self.msg.pose.covariance[0] ** 2 + self.msg.pose.covariance[7] ** 2 > 1**2)):
+ (self.self_odom_msg.pose.covariance[0] ** 2 + self.self_odom_msg.pose.covariance[7] ** 2 > 1**2)):
# Covariance larger than one meter. We definitely can't trust the pose
rospy.sleep(0.001)
print("Waiting for Covariance to be better: ", rospy.get_rostime())
print("done checking covariance")
- while (not rospy.is_shutdown()): # start the actual control loop
- self.tick()
+
+ # initialize global trajectory index
+
+ with self.lock:
+ e, _ = self.get_world_pose_and_speed(self.self_odom_msg)
+
+ while (not rospy.is_shutdown()):
+ # start the actual control loop
+ # run the planner every 10 ticks
+ # the main cycle runs at 100hz, the planner runs at 10hz.
+ # See LOOKAHEAD_TIME in path_planner.py for the horizon of the
+ # planner. Make sure it is significantly (at least 2x) longer
+ # than 1 period of the planner when you change the planner frequency.
+
+ if not self.other_odom_msg is None and self.ticks == 0:
+ # for debugging, publish distance to other buggy
+ with self.lock:
+ self_pose, _ = self.get_world_pose_and_speed(self.self_odom_msg)
+ other_pose, _ = self.get_world_pose_and_speed(self.other_odom_msg)
+ distance = (self_pose.x - other_pose.x) ** 2 + (self_pose.y - other_pose.y) ** 2
+ distance = np.sqrt(distance)
+ self.distance_publisher.publish(Float64(distance))
+
+ # profiling
+ if self.profile:
+ cProfile.runctx('self.planner_tick()', globals(), locals(), sort="cumtime")
+ else:
+ self.planner_tick()
+
+ self.local_controller_tick()
+
self.ticks += 1
+
+ if self.ticks >= 10:
+ self.ticks = 0
+
self.rosrate.sleep()
- def tick(self):
- # Received an updated pose from the state estimator
- # Compute the new control output and publish it to the buggy
- with self.lock:
- msg = self.msg
- current_rospose = msg.pose.pose
+ def get_world_pose_and_speed(self, msg):
+ current_rospose = msg.pose.pose
# Check if the pose covariance is a sane value. Publish a warning if insane
if msg.pose.covariance[0] ** 2 + msg.pose.covariance[7] ** 2 > 1**2:
# Covariance larger than one meter. We definitely can't trust the pose
self.covariance_warning_publisher.publish(Bool(True))
else:
self.covariance_warning_publisher.publish(Bool(False))
-
current_speed = np.sqrt(
msg.twist.twist.linear.x**2 + msg.twist.twist.linear.y**2
)
# Get data from message
pose_gps = Pose.rospose_to_pose(current_rospose)
- pose = World.gps_to_world_pose(pose_gps)
+ return World.gps_to_world_pose(pose_gps), current_speed
- # Compute control output
- steering_angle = self.controller.compute_control(
- pose, self.trajectory, current_speed
- )
+ def local_controller_tick(self):
+ with self.lock:
+ self_pose, self_speed = self.get_world_pose_and_speed(self.self_odom_msg)
- # Publish control output
+ # Compute control output
+ steering_angle = self.local_controller.compute_control(
+ self_pose, self.cur_traj, self_speed)
steering_angle_deg = np.rad2deg(steering_angle)
self.steer_publisher.publish(Float64(steering_angle_deg))
- brake_cmd = self.brake_controller.compute_braking(current_speed, steering_angle_deg)
- self.brake_debug_publisher.publish(Float64(brake_cmd))
- self.brake_publisher.publish(Float64(0)) # No braking for now, just look at debug data
-
- # Publish debug data
- self.heading_publisher.publish(Float32(pose.theta))
-
- # Plot projected forward/back positions
- if (self.ticks % 50 == 0):
- self.controller.plot_trajectory(
- pose, self.trajectory, current_speed
- )
- distance_msg = Float64(self.trajectory.get_distance_from_index(
- self.controller.current_traj_index))
- self.distance_publisher.publish(distance_msg)
+ def planner_tick(self):
+ with self.lock:
+ other_pose, other_speed = self.get_world_pose_and_speed(self.other_odom_msg)
+ # update local trajectory via path planner
+ self.cur_traj = self.path_planner.compute_traj(
+ other_pose,
+ other_speed)
if __name__ == "__main__":
rospy.init_node("auton_system")
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--controller",
+ type=str,
+ help="set controller type",
+ required=True)
+
+ parser.add_argument(
+ "--dist",
+ type=float,
+ help="start buggy at meters distance along the path",
+ required=True)
+
+ parser.add_argument(
+ "--traj",
+ type=str,
+ help="path to the trajectory file, relative to /rb_ws/src/buggy/paths/",
+ required=True)
+
+ parser.add_argument(
+ "--self_name",
+ type=str,
+ help="name of ego-buggy",
+ required=True)
+
+ parser.add_argument(
+ "--other_name",
+ type=str,
+ help="name of other buggy, if left unspecified, the autonsystem assumes it is the only buggy on the course",
+ required=False)
- arg_ctrl = sys.argv[1]
- arg_start_dist = sys.argv[2]
- arg_path = sys.argv[3]
- start_dist = float(arg_start_dist)
- buggy_name = sys.argv[4]
- is_sim = sys.argv[5] == "True"
+ parser.add_argument(
+ "--profile",
+ action='store_true',
+ help="turn on profiling for the path planner")
- print("\n\nStarting Controller: " + str(arg_ctrl) + "\n\n")
- print("\n\nUsing path: /rb_ws/src/buggy/paths/" + str(arg_path) + "\n\n")
- print("\n\nStarting at distance: " + str(arg_start_dist) + "\n\n")
+ args, _ = parser.parse_known_args()
+ ctrl = args.controller
+ start_dist = args.dist
+ traj = args.traj
+ self_name = args.self_name
+ other_name = args.other_name
+ profile = args.profile
+
+ print("\n\nStarting Controller: " + str(ctrl) + "\n\n")
+ print("\n\nUsing path: /rb_ws/src/buggy/paths/" + str(traj) + "\n\n")
+ print("\n\nStarting at distance: " + str(start_dist) + "\n\n")
+
+ trajectory = Trajectory(json_filepath="/rb_ws/src/buggy/paths/" + traj)
- trajectory = Trajectory("/rb_ws/src/buggy/paths/" + arg_path)
# calculate starting index
start_index = trajectory.get_index_from_distance(start_dist)
# Add Controllers Here
- ctrller = None
- if (arg_ctrl == "stanley"):
- ctrller = StanleyController(buggy_name, start_index)
- elif (arg_ctrl == "pure_pursuit"):
- ctrller = PurePursuitController(buggy_name, start_index)
- elif (arg_ctrl == "mpc"):
- ctrller = ModelPredictiveController(buggy_name, start_index)
- if (ctrller == None):
+ local_ctrller = None
+ if (ctrl == "stanley"):
+ local_ctrller = StanleyController(
+ self_name,
+ start_index=start_index)
+
+ elif (ctrl == "pure_pursuit"):
+ local_ctrller = PurePursuitController(
+ self_name,
+ start_index=start_index)
+
+ elif (ctrl == "mpc"):
+ local_ctrller = ModelPredictiveController(
+ self_name,
+ start_index=start_index)
+
+ if (local_ctrller == None):
raise Exception("Invalid Controller Argument")
auton_system = AutonSystem(
trajectory,
- ctrller,
+ local_ctrller,
BrakeController(),
- buggy_name,
- is_sim
+ self_name,
+ other_name,
+ profile
)
+
while not rospy.is_shutdown():
rospy.spin()
diff --git a/rb_ws/src/buggy/scripts/auton/controller.py b/rb_ws/src/buggy/scripts/auton/controller.py
index 5cd3aaf5..cf7120f8 100755
--- a/rb_ws/src/buggy/scripts/auton/controller.py
+++ b/rb_ws/src/buggy/scripts/auton/controller.py
@@ -1,9 +1,7 @@
from abc import ABC, abstractmethod
-import numpy as np
from trajectory import Trajectory
from pose import Pose
-import rospy
from sensor_msgs.msg import NavSatFix
from world import World
@@ -19,7 +17,6 @@ class Controller(ABC):
Example schemes include Pure Pursuit, Stanley, and LQR.
"""
- # TODO: add case for buggy intrinsics
NAND_WHEELBASE = 1.3
SC_WHEELBASE = 1.104
WHEELBASE = SC_WHEELBASE
@@ -27,21 +24,21 @@ class Controller(ABC):
def __init__(self, start_index, buggy_name) -> None:
self.buggy_name = buggy_name
- self.trajectory_forward_1 = rospy.Publisher(
- buggy_name + "/auton/debug/forward1_navsat", NavSatFix, queue_size=1
- )
- self.trajectory_forward_2 = rospy.Publisher(
- buggy_name + "/auton/debug/forward2_navsat", NavSatFix, queue_size=1
- )
- self.trajectory_forward_3 = rospy.Publisher(
- buggy_name + "/auton/debug/forward3_navsat", NavSatFix, queue_size=1
- )
- self.trajectory_backward_1 = rospy.Publisher(
- buggy_name + "/auton/debug/backward1_navsat", NavSatFix, queue_size=1
- )
- # Make lists of publishers for easy iteration
- self.forward_publishers = [self.trajectory_forward_1, self.trajectory_forward_2, self.trajectory_forward_3]
- self.backward_publishers = [self.trajectory_backward_1]
+ # self.trajectory_forward_1 = rospy.Publisher(
+ # buggy_name + "/auton/debug/forward1_navsat", NavSatFix, queue_size=1
+ # )
+ # self.trajectory_forward_2 = rospy.Publisher(
+ # buggy_name + "/auton/debug/forward2_navsat", NavSatFix, queue_size=1
+ # )
+ # self.trajectory_forward_3 = rospy.Publisher(
+ # buggy_name + "/auton/debug/forward3_navsat", NavSatFix, queue_size=1
+ # )
+ # self.trajectory_backward_1 = rospy.Publisher(
+ # buggy_name + "/auton/debug/backward1_navsat", NavSatFix, queue_size=1
+ # )
+ # # Make lists of publishers for easy iteration
+ # self.forward_publishers = [self.trajectory_forward_1, self.trajectory_forward_2, self.trajectory_forward_3]
+ # self.backward_publishers = [self.trajectory_backward_1]
self.current_traj_index = start_index
@abstractmethod
@@ -60,7 +57,7 @@ def compute_control(
float (desired steering angle)
"""
raise NotImplementedError
-
+
def plot_trajectory(
self, current_pose: Pose, trajectory: Trajectory, current_speed: float
):
diff --git a/rb_ws/src/buggy/scripts/auton/model_predictive_controller.py b/rb_ws/src/buggy/scripts/auton/model_predictive_controller.py
index e2af6ea5..6949c266 100644
--- a/rb_ws/src/buggy/scripts/auton/model_predictive_controller.py
+++ b/rb_ws/src/buggy/scripts/auton/model_predictive_controller.py
@@ -1,29 +1,25 @@
#!/usr/bin/env python3
-from abc import ABC, abstractmethod
import time
+from threading import Lock
from numba import njit
import numpy as np
import osqp
-import scipy
from scipy import sparse
import rospy
-from std_msgs.msg import Float32, Float64, Float64MultiArray, MultiArrayDimension, Bool
-from nav_msgs.msg import Odometry
+from std_msgs.msg import Float64, Float64MultiArray, MultiArrayDimension, Bool
from sensor_msgs.msg import NavSatFix
from geometry_msgs.msg import Pose as ROSPose
from geometry_msgs.msg import Pose2D
-from rospy.numpy_msg import numpy_msg
from pose import Pose
from trajectory import Trajectory
from controller import Controller
from world import World
-from threading import Lock
import matplotlib.pyplot as plt
@@ -33,7 +29,7 @@ class ModelPredictiveController(Controller):
Convex Model Predictive Controller (MPC)
"""
- DEBUG = False
+ DEBUG = True
PLOT = False
TIME = False
ROS = True
@@ -437,6 +433,7 @@ def compute_trajectory(self, current_pose: Pose, trajectory: Trajectory, current
# 0 0 0 A2 B2 ... 0 0 0
# 0 0 0 0 0 ... 0 0 0
# 0 0 0 0 0 ... AN-1 BN-1 -I]
+ #
# D = [C; X; U]
# X selects all the states from z
# U selects all the controls from z
@@ -509,7 +506,28 @@ def compute_trajectory(self, current_pose: Pose, trajectory: Trajectory, current
np.zeros((self.N_STATES * self.MPC_HORIZON, self.N_STATES)),
)
)
- D = sparse.vstack([self.C + C1, self.X, self.U])
+
+ # halfplane constraint
+ # c = [n 0 0], where n is the normal vector of the halfplane in x-y space
+ # p is the position of NAND in x-y space
+
+ n = np.array([100, 100])
+ p = np.array([0, 1])
+ c = np.concatenate((n, np.zeros((2, )))).reshape(1, self.N_STATES)
+
+ C2 = sparse.kron(
+ np.eye(self.MPC_HORIZON),
+ np.hstack(
+ (
+ np.zeros((1, self.N_CONTROLS)),
+ c
+ )
+ ),
+ format="csc",
+ )
+
+
+ D = sparse.vstack([self.C + C1, self.X, self.U, C2])
if self.TIME:
create_mat_time_D = 1000.0 * (time.time() - t)
@@ -524,6 +542,7 @@ def compute_trajectory(self, current_pose: Pose, trajectory: Trajectory, current
np.zeros(self.N_STATES * (self.MPC_HORIZON - 1)),
np.tile(self.state_lb, self.MPC_HORIZON) + reference_trajectory.ravel(),
np.tile(self.control_lb, self.MPC_HORIZON) + reference_control.ravel(),
+ np.tile(n.T @ p, self.MPC_HORIZON),
)
)
ub = np.hstack(
@@ -533,6 +552,7 @@ def compute_trajectory(self, current_pose: Pose, trajectory: Trajectory, current
np.zeros(self.N_STATES * (self.MPC_HORIZON - 1)),
np.tile(self.state_ub, self.MPC_HORIZON) + reference_trajectory.ravel(),
np.tile(self.control_ub, self.MPC_HORIZON) + reference_control.ravel(),
+ np.tile(np.inf, self.MPC_HORIZON),
)
)
@@ -577,11 +597,17 @@ def compute_trajectory(self, current_pose: Pose, trajectory: Trajectory, current
if self.TIME:
t = time.time()
results = self.solver.solve()
+
steer_angle = results.x[self.N_CONTROLS + self.N_STATES - 1]
solution_trajectory = np.reshape(results.x, (self.MPC_HORIZON, self.N_STATES + self.N_CONTROLS))
state_trajectory = solution_trajectory[:, self.N_CONTROLS:(self.N_CONTROLS + self.N_STATES)]
+
+ print("status", results.info.status, results.info.status_val)
+ if not (results.info.status == "solved" or results.info.status == "solved inaccurate"):
+ return reference_trajectory
+
state_trajectory += reference_trajectory
- steer_rate_trajectory = solution_trajectory[:, :self.N_CONTROLS]
+ # steer_rate_trajectory = solution_trajectory[:, :self.N_CONTROLS]
if self.TIME:
solve_time = 1000 * (time.time() - t)
@@ -597,7 +623,7 @@ def compute_trajectory(self, current_pose: Pose, trajectory: Trajectory, current
if self.PLOT:
# Plot the results
- fig, [[ax1, ax2], [ax3, ax4]] = plt.subplots(2, 2)
+ _, [[ax1, ax2], [ax3, ax4]] = plt.subplots(2, 2)
# Extract the states
states = np.zeros((self.MPC_HORIZON, self.N_STATES))
diff --git a/rb_ws/src/buggy/scripts/auton/occupancy_grid/grid_manager.py b/rb_ws/src/buggy/scripts/auton/occupancy_grid/grid_manager.py
index 212f5f0d..c2bb4164 100644
--- a/rb_ws/src/buggy/scripts/auton/occupancy_grid/grid_manager.py
+++ b/rb_ws/src/buggy/scripts/auton/occupancy_grid/grid_manager.py
@@ -1,13 +1,16 @@
#!/usr/bin/env python3
+import sys
+import json
+
import numpy as np
+# import scipy
import cv2
-import sys
-from matplotlib import pyplot as plt
+# from matplotlib import pyplot as plt
import utm
+
sys.path.append('rb_ws/src/buggy/scripts/auton')
# from pose import Pose
-import json
"""
####################################
@@ -27,14 +30,14 @@
INIT_WITH_EMPTY_GRID = True
class OccupancyGrid:
def __init__(self):
-
+
# Grid 0, 0 is NW
# 1 pixel = 0.5m
# X = East
# Y = North
-
+
# the original grid, should never change
- self.sat_img = np.array(cv2.cvtColor(cv2.imread("rb_ws/src/buggy/assets/sat_img_resized.png"), cv2.COLOR_BGR2GRAY), np.uint8)
+ self.sat_img = np.array(cv2.cvtColor(cv2.imread("/rb_ws/src/buggy/assets/sat_img_resized.png"), cv2.COLOR_BGR2GRAY), np.uint8)
# For original sat_img.png
# 145.77 pixels = 88.62m
@@ -45,13 +48,13 @@ def __init__(self):
self.grid_og = np.zeros(np.shape(self.sat_img))
self.grid = np.zeros(np.shape(self.sat_img))
else:
- self.grid_og = np.array(cv2.cvtColor(cv2.imread("rb_ws/src/buggy/assets/sat_img_resized.png"), cv2.COLOR_BGR2GRAY), np.uint8)
+ self.grid_og = np.array(cv2.cvtColor(cv2.imread("/rb_ws/src/buggy/assets/sat_img_resized.png"), cv2.COLOR_BGR2GRAY), np.uint8)
# this grid will vary from call to call since NAND will be plotted in here and removed by reverting it back to grid_og
- self.grid = np.array(cv2.cvtColor(cv2.imread("rb_ws/src/buggy/assets/cost_grid.png"), cv2.COLOR_BGR2GRAY), np.uint8)
+ self.grid = np.array(cv2.cvtColor(cv2.imread("/rb_ws/src/buggy/assets/cost_grid.png"), cv2.COLOR_BGR2GRAY), np.uint8)
+
-
- correspondence_f = open("rb_ws/src/buggy/assets/landmarks.json")
+ correspondence_f = open("/rb_ws/src/buggy/assets/landmarks.json")
self.correspondence = json.load(correspondence_f)
correspondence_f.close()
@@ -68,10 +71,10 @@ def __init__(self):
sat_img_pixel = self.correspondence[i]["pixel"]
sat_img_pixel.append(1) # homogenous coordinates
pts_dst.append(sat_img_pixel)
-
+
self.pts_src = np.array(pts_src)
self.pts_dst = np.array(pts_dst)
-
+
# REF_LOC_UTM_1 = utm.from_latlon(40.438834, -79.946334)
# REF_LOC_UTM_1 = [REF_LOC_UTM_1[0], REF_LOC_UTM_1[1], 1]
@@ -86,7 +89,7 @@ def __init__(self):
# SAT_IMG_LOC_2 = [1043, 1048, 1]
# SAT_IMG_LOC_3 = [1128, 290, 1]
# SAT_IMG_LOC_4 = [406, 366, 1]
-
+
# pts_src = np.array([REF_LOC_UTM_1, REF_LOC_UTM_2, REF_LOC_UTM_3, REF_LOC_UTM_4])
# pts_dst = np.array([SAT_IMG_LOC_1, SAT_IMG_LOC_2, SAT_IMG_LOC_3, SAT_IMG_LOC_4])
@@ -94,7 +97,7 @@ def __init__(self):
if not np.allclose(status, 1, atol=1e-5):
raise Exception("Cannot compute homography")
-
+
def get_pixel_from_utm(self, utm_coord: np.array):
"""Calculate the pixel coordinates from the utm coordinates
@@ -129,7 +132,7 @@ def get_pixel_from_latlon(self, latlon_coord: np.array):
utm_coord_homogenous = np.array([utm_coord[0], utm_coord[1], ones])
loc = self.homography @ utm_coord_homogenous
return np.array([loc[0]/loc[2], loc[1]/loc[2]]).T
-
+
def plot_points(self, utm_coords: list):
"""Mark the points on the grid completely BLACK
@@ -145,10 +148,10 @@ def plot_points(self, utm_coords: list):
self.grid[pixel_coords[:, 0], pixel_coords[:, 1]] = 255
def set_cost(self, utm_coords: list, cost: list):
- """Set the grid's cost
+ """Set the grid's cost
Args:
- utm_coords (list): list of pixel coordinates to mark (x, y)
+ utm_coords (list): list of utm coordinates to mark (x, y)
[[0,0],
[100, 100],
[125, 400]] as an example
@@ -157,14 +160,14 @@ def set_cost(self, utm_coords: list, cost: list):
utm_coords = np.array(utm_coords)
utm_coords = utm_coords[:, 0:2]
pxl_coords = self.get_pixel_from_utm(utm_coords).astype(int)
- print(pxl_coords)
+
self.grid[pxl_coords[:, 0], pxl_coords[:, 1]] = cost
def set_cost_persistent(self, utm_coords: list, cost: list):
"""Set the grid's cost but further calls to reset_grid() will not remove these points
Args:
- utm_coords (list): list of pixel coordinates to mark (x, y)
+ utm_coords (list): list of utm coordinates to mark (x, y)
[[0,0],
[100, 100],
[125, 400]] as an example
@@ -181,26 +184,25 @@ def reset_grid(self):
"""
self.grid = np.copy(self.grid_og)
- def get_utm_cost(self, utm_coords: list):
+ def get_utm_cost(self, coords):
"""Get the cost of the trajectory passed in as utm_coordinates
Args:
- utm_coords (list): coordinates in utm
- [[0,0],
+ coords (np array): coordinates in utm
+ [[0,0],
[100, 100],
[125, 400]] as an example
Returns:
normalized cost: sum of all the pixels / number of pixels crossed
"""
- coords = np.array(utm_coords)
utm_coords = coords[:, 0:2]
pxl_coords = self.get_pixel_from_utm(utm_coords).astype(int)
filtered_pxl_coords = np.unique(pxl_coords[:, 0:2], axis=0)
num_pixels_passed_thru = len(filtered_pxl_coords)
total = np.sum(self.grid[filtered_pxl_coords[:, 0], filtered_pxl_coords[:, 1]])
return total/num_pixels_passed_thru
-
+
def get_pxl_cost(self, pxl_coords: list):
"""Get the cost of the trajectory passed in as pxl_coordinates
diff --git a/rb_ws/src/buggy/scripts/auton/path_planner.py b/rb_ws/src/buggy/scripts/auton/path_planner.py
new file mode 100755
index 00000000..1d42592e
--- /dev/null
+++ b/rb_ws/src/buggy/scripts/auton/path_planner.py
@@ -0,0 +1,146 @@
+import numpy as np
+
+import rospy
+from sensor_msgs.msg import NavSatFix
+from std_msgs.msg import Float64
+from pose import Pose
+
+from occupancy_grid.grid_manager import OccupancyGrid
+from path_projection import Projector
+from trajectory import Trajectory
+from world import World
+class PathPlanner():
+ LOOKAHEAD_TIME = 2.0 #s
+ RESOLUTION = 30 #samples/s
+ PASSING_OFFSET = 2 #m
+ # in meters, the number of meters behind NAND before we start morphing the trajectory
+ REAR_MARGIN = 10 #m
+
+ # in meters, the number of meters in front of NAND,
+ # before the morphed trajectory rejoins the nominal trajectory
+ # WARNING: set this value to be greater than 15m/s * lookahead time (10 m/s is the upper limit
+ # of NAND speed) Failure to do so can result violent u-turns in the new trajectory.
+ FRONT_MARGIN = 35 #m
+
+ def __init__(self, nominal_traj:Trajectory) -> None:
+ self.occupancy_grid = OccupancyGrid()
+
+ # TODO: update with NAND wheelbase
+ self.path_projector = Projector(1.3)
+
+ self.debug_passing_traj_publisher = rospy.Publisher(
+ "/auton/debug/passing_traj", NavSatFix, queue_size=1000
+ )
+
+ self.debug_splice_pt_publisher = rospy.Publisher(
+ "/auton/debug/splice_pts", NavSatFix, queue_size=1000
+ )
+
+
+ self.debug_grid_cost_publisher = rospy.Publisher(
+ "/auton/debug/grid_cost", Float64, queue_size=0
+ )
+ self.nominal_traj = nominal_traj
+
+ # TODO: estimate this value based on the curvature of NAND's recent positions
+ self.other_steering_angle = 0
+
+ def compute_traj(
+ self,
+ other_pose: Pose, #Currently NAND's location -- To be Changed
+ other_speed: float) -> Trajectory:
+ """
+ draw trajectory, such that the section of the
+ trajectory near NAND is replaced by a new segment:
+ #1. the nominal trajectory, until REAR_MARGIN behind NAND
+ #2. passing targets
+ #3. nominal trajectory, starting at FRONT_MARGIN ahead of NAND
+
+ To generate the passing targets, we take NAND's future positions,
+ shifted along normal of nominal trajectory by PASSING_OFFSET
+
+ Since they are shifted by a fixed distance, this approach assumes NAND's
+ trajectory is similar to that of ego-buggy (short-circuit).
+
+ TODO: shift the future positions while taking into account smoothness of the path,
+ as well as curbs, such that even if NAND's trajectory is different from ego-buggy,
+ the generated passing targets will be safe.
+
+ Args:
+ other_pose (Pose): Pose containing NAND's easting (x),
+ northing(y), and heading (theta), in "world" cooridnates,
+ which is UTM, shifted so that the origin is near the course.
+
+ See world.py
+
+ other_speed (float): speed in m/s of NAND
+
+ Returns:
+ Trajectory: new trajectory object for ego-buggy to follow.
+ """
+
+ other_idx:float = self.nominal_traj.get_closest_index_on_path(
+ other_pose.x,
+ other_pose.y)
+ #other is just NAND, for general purposes consider it other
+
+ new_segment_start_idx:float = self.nominal_traj.get_index_from_distance(
+ self.nominal_traj.get_distance_from_index(other_idx) - self.REAR_MARGIN
+ ) #Where new path index starts, CONSTANT delta from NAND
+
+ new_segment_end_idx:float = self.nominal_traj.get_index_from_distance(
+ self.nominal_traj.get_distance_from_index(other_idx) + self.FRONT_MARGIN
+ )
+
+ # project other buggy path
+ # TODO: put other buggy command
+ other_future_poses:list = self.path_projector.project(
+ other_pose,
+ self.other_steering_angle,
+ other_speed,
+ self.LOOKAHEAD_TIME,
+ self.RESOLUTION)
+
+ other_future_poses_idxs = np.empty((len(other_future_poses), ))
+
+ # TODO: optimize this lookup -- how tho
+ for i in range(len(other_future_poses)):
+ other_future_poses_idxs[i] = self.nominal_traj.get_closest_index_on_path(
+ other_future_poses[i][0],
+ other_future_poses[i][1],
+ start_index=other_idx)
+
+ future_pose_unit_normal:np.typing.NDArray = self.nominal_traj.get_unit_normal_by_index(
+ other_future_poses_idxs
+ )
+ passing_targets = other_future_poses + self.PASSING_OFFSET * future_pose_unit_normal
+
+ pre_slice = self.nominal_traj.positions[:int(new_segment_start_idx), :]
+ post_slice = self.nominal_traj.positions[int(new_segment_end_idx):, :]
+ new_path = np.vstack((pre_slice, passing_targets, post_slice))
+
+ # publish passing targets for debugging
+ for i in range(len(passing_targets)):
+ reference_navsat = NavSatFix()
+ ref_gps = World.world_to_gps(passing_targets[i, 0], passing_targets[i, 1])
+ reference_navsat.latitude = ref_gps[0]
+ reference_navsat.longitude = ref_gps[1]
+ self.debug_passing_traj_publisher.publish(reference_navsat)
+
+ # for debugging:
+ # publish the first and last point of the part of the original trajectory
+ # that got spliced out
+ reference_navsat = NavSatFix()
+ ref_gps = World.world_to_gps(*self.nominal_traj.get_position_by_index(int(new_segment_start_idx)))
+ reference_navsat.latitude = ref_gps[0]
+ reference_navsat.longitude = ref_gps[1]
+ self.debug_splice_pt_publisher.publish(reference_navsat)
+
+ ref_gps = World.world_to_gps(*self.nominal_traj.get_position_by_index(int(new_segment_end_idx)))
+ reference_navsat.latitude = ref_gps[0]
+ reference_navsat.longitude = ref_gps[1]
+ self.debug_splice_pt_publisher.publish(reference_navsat)
+
+
+ # generate new path
+ return Trajectory(json_filepath=None, positions=new_path)
\ No newline at end of file
diff --git a/rb_ws/src/buggy/scripts/auton/path_projection.py b/rb_ws/src/buggy/scripts/auton/path_projection.py
old mode 100644
new mode 100755
index 3530b248..98b86105
--- a/rb_ws/src/buggy/scripts/auton/path_projection.py
+++ b/rb_ws/src/buggy/scripts/auton/path_projection.py
@@ -1,3 +1,4 @@
+# import time
import numpy as np
from pose import Pose
@@ -8,43 +9,52 @@ class Projector:
def __init__(self, wheelbase: float):
self.wheelbase = wheelbase
-
- def project(self, pose: Pose, command: float, v: float, time: float, resolution: int) -> list:
+
+ def project(self, pose: Pose, command: float, v: float, delta_t: float, resolution: int) -> list:
"""
- Project buggy motion analytically. Assumes constant velocity and turning angle for the duration.
+ Project buggy motion analytically. Assumes constant velocity and turning angle for the duration.
Args:
- pose (Pose): Pose containing utm_e, utm_n, and heading, in UTM coords and degrees
+ pose (Pose): Pose containing utm_e, utm_n, and heading, in UTM coords and radians
command (float): Turning angle, in degrees
v (float): Buggy velocity, in m/s
- time (float): Time to look ahead, in s
+ delta_t (float): Time to look ahead, in s
resolution (int): Number of points to output per second
Returns:
- list: List containing 3-tuples of utm_e, utm_n, heading positions along the projected arc.
+ list: 2D List with shape ((time * resolutions), (2)),
+ where the ith row is [utm_e, utm_n] at ith prediction
"""
+
+ # t_0 = time.perf_counter_ns()
+
dtheta = v * np.tan(np.deg2rad(command)) / self.wheelbase
ts = 1/resolution
- t = 0
- output = []
-
- for i in range(int(resolution * time)):
- t += ts
- theta = t * dtheta + np.deg2rad(pose.theta)
- if dtheta != 0:
- x = pose.x + (v / dtheta) * np.sin(theta)
- y = pose.y + (v / dtheta) * (1-np.cos(theta))
- else:
- x = pose.x + t * v * np.cos(theta)
- y = pose.y + t * v * np.sin(theta)
-
- output.append((x, y, np.rad2deg(theta)))
-
+ t = np.arange(0, delta_t, ts)
+ theta_0 = pose.theta
+ theta = t * dtheta + theta_0
+
+ if dtheta != 0:
+ # for each t,
+ # do the integral of cos(theta(t)) over t for x, between t=0 and t=delta_t
+ # do integral of sin(theta(t)) for y.
+ # theta(t) = theta_0 + delta_t * dtheta
+ v_over_dtheta = v / dtheta
+ x = pose.x + v_over_dtheta * (np.sin(theta) - np.sin(theta_0))
+ y = pose.y + v_over_dtheta * (-np.cos(theta) + np.cos(theta_0))
+ else:
+ x = pose.x + t * v * np.cos(theta)
+ y = pose.y + t * v * np.sin(theta)
+
+ output = np.vstack((x, y)).T
+ # t_1 = time.perf_counter_ns()
+ # print("time per point (us):", (t_1 - t_0)/(1000 * resolution * delta_t))
+
return output
def project_discrete(self, pose: Pose, command: float, v: float, time: float, resolution: int, sim_rate: int) -> list:
"""
- Project buggy motion discretely, performing kinematics at each sim_ts. Assumes constant velocity and turning angle for the duration.
+ Project buggy motion discretely, performing kinematics at each sim_ts. Assumes constant velocity and turning angle for the duration.
Args:
pose (Pose): Pose containing utm_e, utm_n, and heading, in UTM coords and degrees
@@ -76,5 +86,5 @@ def project_discrete(self, pose: Pose, command: float, v: float, time: float, re
if t >= next_out:
output.append((x, y, np.rad2deg(theta)))
next_out += ts
-
+
return output
\ No newline at end of file
diff --git a/rb_ws/src/buggy/scripts/auton/stanley_controller.py b/rb_ws/src/buggy/scripts/auton/stanley_controller.py
index f2138a33..73e9f76e 100644
--- a/rb_ws/src/buggy/scripts/auton/stanley_controller.py
+++ b/rb_ws/src/buggy/scripts/auton/stanley_controller.py
@@ -1,5 +1,3 @@
-from abc import ABC, abstractmethod
-
import numpy as np
import rospy
@@ -23,7 +21,7 @@ class StanleyController(Controller):
MAX_LOOK_AHEAD_DIST = 2
CROSS_TRACK_GAIN = 1
- HEADING_GAIN = 0.75
+ HEADING_GAIN = 0.3
def __init__(self, buggy_name, start_index=0) -> None:
super(StanleyController, self).__init__(start_index, buggy_name)
@@ -63,8 +61,8 @@ def compute_control(
traj_index = trajectory.get_closest_index_on_path(
front_x,
front_y,
- start_index=self.current_traj_index,
- end_index=self.current_traj_index + 10,
+ start_index=self.current_traj_index - 20,
+ end_index=self.current_traj_index + 50,
)
self.current_traj_index = max(traj_index, self.current_traj_index)
diff --git a/rb_ws/src/buggy/scripts/auton/trajectory.py b/rb_ws/src/buggy/scripts/auton/trajectory.py
index 855c3f2d..92822088 100755
--- a/rb_ws/src/buggy/scripts/auton/trajectory.py
+++ b/rb_ws/src/buggy/scripts/auton/trajectory.py
@@ -1,5 +1,8 @@
-import numpy as np
import json
+import uuid
+import matplotlib.pyplot as plt
+import numpy as np
+
from scipy.interpolate import Akima1DInterpolator, CubicSpline
from world import World
@@ -20,37 +23,52 @@ class Trajectory:
Use https://rdudhagra.github.io/eracer-portal/ to make trajectories and save the JSON file
"""
- def __init__(self, json_filepath, interpolator="CubicSpline") -> None:
+ def __init__(self, json_filepath=None, positions = None, interpolator="CubicSpline") -> None:
+ """
+ Args:
+ json_filepath (String): file path to the path json file (begins at /rb_ws)
+ positions [[float, float]]: reference trajectory in world coordinates
+ current_speed (float): current speed of the buggy
+
+ Returns:
+ float (desired steering angle)
+ """
self.distances = np.zeros((0, 1)) # (N/dt x 1) [d, d, ...]
self.positions = np.zeros((0, 2)) # (N x 2) [(x,y), (x,y), ...]
self.indices = None # (N x 1) [0, 1, 2, ...]
self.interpolation = None # scipy.interpolate.PPoly
-
- pos = []
- # Load the json file
- with open(json_filepath, "r") as f:
- data = json.load(f)
- # Iterate through the waypoints and extract the positions
- num_waypoints = len(data)
- for i in range(0, num_waypoints):
+ # read positions from file
+ if positions is None:
+ positions = []
+ # Load the json file
+ with open(json_filepath, "r") as f:
+ data = json.load(f)
+
+ # Iterate through the waypoints and extract the positions
+ num_waypoints = len(data)
+ for i in range(0, num_waypoints):
+
+ waypoint = data[i]
+
+ lat = waypoint["lat"]
+ lon = waypoint["lon"]
- waypoint = data[i]
+ # Convert to world coordinates
+ x, y = World.gps_to_world(lat, lon)
+ positions.append([x, y])
- lat = waypoint["lat"]
- lon = waypoint["lon"]
+ positions = np.array(positions)
- # Convert to world coordinates
- x, y = World.gps_to_world(lat, lon)
- pos.append([x, y])
- num_indices = len(pos)
+ num_indices = positions.shape[0]
if interpolator == "Akima":
- self.positions = np.array(pos)
+ self.positions = positions
self.indices = np.arange(num_indices)
self.interpolation = Akima1DInterpolator(self.indices, self.positions)
- else:
- temp_traj = Trajectory(json_filepath, interpolator="Akima")
+ self.interpolation.extrapolate = True
+ elif interpolator == "CubicSpline":
+ temp_traj = Trajectory(positions=positions, interpolator="Akima")
tot_len = temp_traj.distances[-1]
interp_dists = np.linspace(0, tot_len, num_indices)
@@ -62,15 +80,15 @@ def __init__(self, json_filepath, interpolator="CubicSpline") -> None:
self.positions = np.array(self.positions)
self.interpolation = CubicSpline(self.indices, self.positions)
- self.interpolation.extrapolate = True
+ self.interpolation.extrapolate = True
# Calculate the distances along the trajectory
- dt = 0.01
- ts = np.arange(len(self.positions) - 1, step=dt)
+ dt = 0.01 #dt is time step (in seconds (?))
+ ts = np.arange(len(self.positions) - 1, step=dt) # times corresponding to each position (?)
drdt = self.interpolation(
ts, nu=1
- ) # Calculate derivatives of polynomial wrt indices
- ds = np.sqrt(drdt[:, 0] ** 2 + drdt[:, 1] ** 2) * dt
+ ) # Calculate derivatives of interpolated path wrt indices
+ ds = np.sqrt(drdt[:, 0] ** 2 + drdt[:, 1] ** 2) * dt # distances to each interpolated point
s = np.cumsum(np.hstack([[0], ds[:-1]]))
self.distances = s
self.dt = dt
@@ -266,6 +284,22 @@ def get_dynamics_by_index(self, index, wheelbase):
return np.stack((x, y, theta, np.arctan(wheelbase * curvature)), axis=-1)
+ def get_unit_normal_by_index(self, index):
+ """Gets the index of the closest point on the trajectory to the given point
+
+ Args:
+ index: Nx1 numpy array: indexes along trajectory
+ Returns:
+ Nx2 numpy array: unit normal of the trajectory at index
+ """
+
+ derivative = self.interpolation(index, nu=1)
+ unit_derivative = derivative / np.linalg.norm(derivative, axis=1)[:, None]
+
+ # (x, y), rotated by 90 deg ccw = (-y, x)
+ unit_normal = np.vstack((-unit_derivative[:, 1], unit_derivative[:, 0])).T
+ return unit_normal
+
def get_closest_index_on_path(
self, x, y, start_index=0, end_index=None, subsample_resolution=10000
):
@@ -276,37 +310,37 @@ def get_closest_index_on_path(
y (float): y coordinate
start_index (int, optional): index to start searching from. Defaults to 0.
end_index (int, optional): index to end searching at. Defaults to None (disable).
-
+ subsample_resolution: resolution of the resulting interpolation
Returns:
- int: index along the trajectory
+ float: index along the trajectory
"""
# If end_index is not specified, use the length of the trajectory
if end_index is None:
- end_index = len(self.positions)
+ end_index = len(self.positions) #sketch, 0-indexing where??
# Floor/ceil the start/end indices
- start_index = int(np.floor(start_index))
+ start_index = max(0, int(np.floor(start_index)))
end_index = int(np.ceil(end_index))
# Calculate the distance from the point to each point on the trajectory
distances = (self.positions[start_index : end_index + 1, 0] - x) ** 2 + (
self.positions[start_index : end_index + 1, 1] - y
- ) ** 2
+ ) ** 2 #Don't need to squareroot as it is a relative distance
min_ind = np.argmin(distances) + start_index
- start_index = max(0, min_ind - 1)
- end_index = min(len(self.positions), min_ind + 1)
+ start_index = max(0, min_ind - 1) #Protection in case min_ind is too low
+ end_index = min(len(self.positions), min_ind + 1) #Prtoecting in case min_ind too high
+ #Theoretically start_index and end_index are just two apart
# Now interpolate at a higher resolution to get a more accurate result
r_interp = self.interpolation(
- np.linspace(start_index, end_index, subsample_resolution + 1)
- )
- x_interp, y_interp = r_interp[:, 0], r_interp[:, 1]
+ np.linspace(start_index, end_index, subsample_resolution + 1) )
+ x_interp, y_interp = r_interp[:, 0], r_interp[:, 1] #x_interp, y_interp are numpy column vectors
- distances = (x_interp - x) ** 2 + (y_interp - y) ** 2
+ distances = (x_interp - x) ** 2 + (y_interp - y) ** 2 #Again distances are relative
- # Return the index of the closest point
+ # Return the rational index of the closest point
return (
np.argmin(distances) / subsample_resolution * (end_index - start_index)
+ start_index
@@ -317,8 +351,6 @@ def get_closest_index_on_path(
# Example usage
trajectory = Trajectory("/rb_ws/src/buggy/paths/quartermiletrack.json")
- import json
- import uuid
interp_dat = []
for k in np.linspace(0, trajectory.indices[-1], 500):
@@ -329,7 +361,6 @@ def get_closest_index_on_path(
{"lat": lat, "lon": lon, "key": str(uuid.uuid4()), "active": False}
)
- import matplotlib.pyplot as plt
ts = np.linspace(0, trajectory.indices[-1], 500)
kurv = [trajectory.get_curvature_by_index(t) for t in ts]
diff --git a/rb_ws/src/buggy/scripts/auton/world.py b/rb_ws/src/buggy/scripts/auton/world.py
index 3684a849..21e5b2da 100755
--- a/rb_ws/src/buggy/scripts/auton/world.py
+++ b/rb_ws/src/buggy/scripts/auton/world.py
@@ -49,6 +49,75 @@ def gps_to_world(lat, lon):
return x, y
+ @staticmethod
+ def utm_to_world_pose(pose: Pose) -> Pose:
+ """Converts UTM coordinates to world coordinates
+
+ Args:
+ pose (Pose): pose with utm coordinates
+
+ Returns:
+ Pose: pose with world coordinates
+ """
+
+ utm_e = pose.x
+ utm_n = pose.y
+ x = utm_e - World.WORLD_EAST_ZERO
+ y = utm_n - World.WORLD_NORTH_ZERO
+ return Pose(x, y, pose.theta)
+
+ @staticmethod
+ def world_to_utm_pose(pose: Pose) -> Pose:
+ """Converts world coordinates to utm coordinates
+
+ Args:
+ pose (Pose): pose with world coordinates
+
+ Returns:
+ Pose: pose with world coordinates
+ """
+
+ utm_e = pose.x + World.WORLD_EAST_ZERO
+ utm_n = pose.y + World.WORLD_NORTH_ZERO
+ return Pose(utm_e, utm_n, pose.theta)
+
+ @staticmethod
+ def world_to_utm_numpy(coords):
+ """Converts world coordinates to utm coordinates
+
+ Args:
+ coords (numpy.ndarray [size: (N,2)]): array of x, y pairs
+
+ Returns:
+ numpy.ndarray [size: (N,2)]: array of utm_e, utm_n pairs
+ """
+
+ N = np.shape(coords)[0]
+ utm_offset_e = np.ones((N, )) * World.WORLD_EAST_ZERO
+ utm_offset_n = np.ones((N, )) * World.WORLD_NORTH_ZERO
+ utm_offset = np.stack((utm_offset_e, utm_offset_n), axis=1)
+
+ return coords + utm_offset
+
+ @staticmethod
+ def utm_to_world_numpy(coords):
+ """Converts utm coordinates to world coordinates
+
+ Args:
+ coords (numpy.ndarray [size: (N,2)]): array of utm_e, utm_n pairs
+
+ Returns:
+ numpy.ndarray [size: (N,2)]: array of x, y pairs
+ """
+
+ N = np.shape(coords)[0]
+ utm_offset_e = np.ones((N, )) * World.WORLD_EAST_ZERO
+ utm_offset_n = np.ones((N, )) * World.WORLD_NORTH_ZERO
+ utm_offset = np.stack((utm_offset_e, utm_offset_n), axis=1)
+
+ return coords - utm_offset
+
+
@staticmethod
def world_to_gps(x, y):
"""Converts world coordinates to GPS coordinates
@@ -68,6 +137,25 @@ def world_to_gps(x, y):
return lat, lon
+ @staticmethod
+ def utm_to_gps(x, y):
+ """Converts utm coordinates to GPS coordinates
+
+ Args:
+ x (float): x in meters, in utm frame
+ y (float): y in meters, in utm frame
+
+ Returns:
+ tuple: (lat, lon)
+ """
+ utm_coords = utm.to_latlon(
+ x, y, 17, "T"
+ )
+ lat = utm_coords[0]
+ lon = utm_coords[1]
+
+ return lat, lon
+
@staticmethod
def gps_to_world_numpy(coords):
"""Converts GPS coordinates to world coordinates