-
Notifications
You must be signed in to change notification settings - Fork 0
/
waypoint_loader.py
executable file
·86 lines (66 loc) · 2.55 KB
/
waypoint_loader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#!/usr/bin/env python
import os
import csv
import math
from geometry_msgs.msg import Quaternion
from styx_msgs.msg import Lane, Waypoint
import tf
import rospy
CSV_HEADER = ['x', 'y', 'z', 'yaw']
MAX_DECEL = 1.0
class WaypointLoader(object):
def __init__(self):
rospy.init_node('waypoint_loader', log_level=rospy.DEBUG)
self.pub = rospy.Publisher('/base_waypoints', Lane, queue_size=1, latch=True)
self.velocity = self.kmph2mps(rospy.get_param('~velocity'))
self.new_waypoint_loader(rospy.get_param('~path'))
rospy.spin()
def new_waypoint_loader(self, path):
if os.path.isfile(path):
waypoints = self.load_waypoints(path)
self.publish(waypoints)
rospy.loginfo('Waypoint Loded')
else:
rospy.logerr('%s is not a file', path)
def quaternion_from_yaw(self, yaw):
return tf.transformations.quaternion_from_euler(0., 0., yaw)
def kmph2mps(self, velocity_kmph):
return (velocity_kmph * 1000.) / (60. * 60.)
def load_waypoints(self, fname):
waypoints = []
with open(fname) as wfile:
reader = csv.DictReader(wfile, CSV_HEADER)
for wp in reader:
p = Waypoint()
p.pose.pose.position.x = float(wp['x'])
p.pose.pose.position.y = float(wp['y'])
p.pose.pose.position.z = float(wp['z'])
q = self.quaternion_from_yaw(float(wp['yaw']))
p.pose.pose.orientation = Quaternion(*q)
p.twist.twist.linear.x = float(self.velocity)
waypoints.append(p)
return self.decelerate(waypoints)
def distance(self, p1, p2):
x, y, z = p1.x - p2.x, p1.y - p2.y, p1.z - p2.z
return math.sqrt(x*x + y*y + z*z)
def decelerate(self, waypoints):
last = waypoints[-1]
last.twist.twist.linear.x = 0.
for wp in waypoints[:-1][::-1]:
dist = self.distance(wp.pose.pose.position, last.pose.pose.position)
vel = math.sqrt(2 * MAX_DECEL * dist)
if vel < 1.:
vel = 0.
wp.twist.twist.linear.x = min(vel, wp.twist.twist.linear.x)
return waypoints
def publish(self, waypoints):
lane = Lane()
lane.header.frame_id = '/world'
lane.header.stamp = rospy.Time(0)
lane.waypoints = waypoints
self.pub.publish(lane)
if __name__ == '__main__':
try:
WaypointLoader()
except rospy.ROSInterruptException:
rospy.logerr('Could not start waypoint node.')