Skip to content
This repository has been archived by the owner on Nov 16, 2023. It is now read-only.

Commit

Permalink
feat(#67): Added first implementation of Pure Pursuit Controller
Browse files Browse the repository at this point in the history
  • Loading branch information
Julian-Graf committed Jan 5, 2023
1 parent 860fbc4 commit 766c67b
Showing 1 changed file with 100 additions and 13 deletions.
113 changes: 100 additions & 13 deletions code/acting/src/acting/pure_pursuit_controller.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
#!/usr/bin/env python
from math import atan, sin

import numpy
import ros_compatibility as roscomp
# from carla_msgs.msg import CarlaGnssRoute
from carla_msgs.msg import CarlaSpeedometer
from geometry_msgs.msg import Point, Pose
from nav_msgs.msg import Path
from ros_compatibility.node import CompatibleNode
from ros_compatibility.qos import QoSProfile, DurabilityPolicy
from rospy import Publisher, Subscriber
from sensor_msgs.msg import NavSatFix, Imu
from std_msgs.msg import Float32, Bool
from tf.transformations import euler_from_quaternion


# todo: docs
# todo: test
class PurePursuitController(CompatibleNode):
def __init__(self):
super(PurePursuitController, self).__init__('pure_pursuit_controller')
Expand All @@ -16,17 +24,29 @@ def __init__(self):
self.control_loop_rate = self.get_param('control_loop_rate', 0.05)
self.role_name = self.get_param('role_name', 'ego_vehicle')

self.stanley_steer_sub: Subscriber = self.new_subscription(
Float32,
f"/carla/{self.role_name}/max_velocity",
self.__get_max_velocity,
self.position_sub: Subscriber = self.new_subscription(
Path,
f"/carla/{self.role_name}/path",
self.__set_path,
qos_profile=1)

# self.position_sub: Subscriber = self.new_subscription(
# GNSSMeasurement,
# f"/carla/{self.role_name}/Speed",
# self.__get_velocity,
# qos_profile=1)
self.heading_sub: Subscriber = self.new_subscription(
Imu,
f"/carla/{self.role_name}/IMU",
self.__set_heading,
qos_profile=1)

self.path_sub: Subscriber = self.new_subscription(
NavSatFix,
f"/carla/{self.role_name}/GPS",
self.__set_position,
qos_profile=1)

self.velocity_sub: Subscriber = self.new_subscription(
CarlaSpeedometer,
f"/carla/{self.role_name}/Speed",
self.__set_velocity,
qos_profile=1)

self.pure_pursuit_steer_pub: Publisher = self.new_publisher(
Float32,
Expand All @@ -41,8 +61,10 @@ def __init__(self):
durability=DurabilityPolicy.TRANSIENT_LOCAL)
)

self.__current_velocity: float = 0
self.__max_velocity: float = 0
self.__position: (float, float) = None # latitude, longitude in deg
self.__path: Path = None
self.__heading: float = None
self.__velocity: float = None

def run(self):
"""
Expand All @@ -51,6 +73,7 @@ def run(self):
"""
self.loginfo('PurePursuitController node running')
self.status_pub.publish(True) # todo: delete (this is only

# necessary if vehicle controller isn't running)

def loop(timer_event=None):
Expand All @@ -59,11 +82,75 @@ def loop(timer_event=None):
:param timer_event: Timer event from ROS
:return:
"""
pass
if self.__path is None:
self.logerr("PurePursuitController hasn't received a path yet "
"and can therefore not publish steering")
return
if self.__position is None:
self.logerr("PurePursuitController hasn't received the"
"position of the vehicle yet "
"and can therefore not publish steering")
return

if self.__heading is None:
self.logerr("PurePursuitController hasn't received the heading"
"of the vehicle yet and can therefore "
"not publish steering")
return

if self.__velocity is None:
self.logerr("PurePursuitController hasn't received the "
"velocity of the vehicle yet "
"and can therefore not publish steering")
return
self.pure_pursuit_steer_pub.publish(self.__calculate_steer())

self.new_timer(self.control_loop_rate, loop)
self.spin()

def __set_position(self, data: NavSatFix):
self.__position = (data.latitude, data.longitude)

def __set_path(self, data: Path):
self.__path = data

def __set_heading(self, data: Imu): # todo: test
rot_euler = euler_from_quaternion(data.orientation)
self.__heading = rot_euler[2]

def __set_velocity(self, data):
self.__velocity = data.speed

def __calculate_steer(self) -> float:
L = 2 # dist between front and rear wheels todo: measure
K = 10 # todo: tune
alpha = self.__get_heading_error()
look_ahead_dist = K * self.__velocity
steering_angle = atan((2 * L * sin(alpha)) / look_ahead_dist)
return steering_angle

def __get_heading_error(self) -> float:
my_heading = self.__heading
target_rot = self.__get_closest_point_on_path().orientation
rot_euler = euler_from_quaternion(target_rot)
target_heading = rot_euler[2]
return target_heading - my_heading

def __get_closest_point_on_path(self) -> Pose:
min_dist: float = 10e1000
min_pos: Pose = None
for pose in self.__path.poses:
if self.__dist_to(pose.position) <= min_dist:
min_dist = self.__dist_to(pose.position)
min_pos = pose
if min_pos is None:
raise Exception()
return min_pos

def __dist_to(self, pos: Point) -> float:
my_pos = numpy.array((self.__position[0], self.__position[1], pos[2]))
return numpy.linalg.norm(my_pos - pos)


def main(args=None):
"""
Expand Down

0 comments on commit 766c67b

Please sign in to comment.