-
Notifications
You must be signed in to change notification settings - Fork 0
/
waypoint_updater.py
executable file
·158 lines (116 loc) · 5.08 KB
/
waypoint_updater.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
#!/usr/bin/env python
import math
import numpy as np
import rospy
from geometry_msgs.msg import PoseStamped
from scipy.spatial import KDTree
from styx_msgs.msg import Lane, Waypoint
from std_msgs.msg import Int32
'''
This node will publish waypoints from the car's current position to some `x` distance ahead.
As mentioned in the doc, you should ideally first implement a version which does not care
about traffic lights or obstacles.
Once you have created dbw_node, you will update this node to use the status of traffic lights too.
Please note that our simulator also provides the exact location of traffic lights and their
current status in `/vehicle/traffic_lights` message. You can use this message to build this node
as well as to verify your TL classifier.
TODO (for Yousuf and Aaron): Stopline location for each traffic light.
'''
LOOKAHEAD_WPS = 200 # Number of waypoints we will publish. You can change this number
MAX_DECEL = 1.0
class WaypointUpdater(object):
def __init__(self):
rospy.init_node('waypoint_updater')
# TODO: Add other member variables you need below
self.base_lane = None
self.pose = None
self.stopline_wp_idx = -1
self.waypoints_2d = None
self.waypoint_tree = None
rospy.Subscriber('/current_pose', PoseStamped, self.pose_cb)
rospy.Subscriber('/base_waypoints', Lane, self.waypoints_cb)
rospy.Subscriber('/traffic_waypoint', Int32, self.traffic_cb)
# TODO: Add a subscriber for /traffic_waypoint and /obstacle_waypoint below
self.final_waypoints_pub = rospy.Publisher('final_waypoints', Lane, queue_size=1)
self.loop()
def loop(self):
rate = rospy.Rate(50) # 50 Hz
while not rospy.is_shutdown():
if self.pose and self.base_lane:
# Get closest waypoint
self.publish_waypoints()
rate.sleep()
def get_closest_waypoint_idx(self):
x = self.pose.pose.position.x
y = self.pose.pose.position.y
closest_idx = self.waypoint_tree.query([x, y], 1)[1]
# Check if closest is ahead or behind vehicle
closest_coord = self.waypoints_2d[closest_idx]
prev_coord = self.waypoints_2d[closest_idx - 1]
# Equation for hyperplane through closest_coords
cl_vect = np.array(closest_coord)
prev_vect = np.array(prev_coord)
pos_vect = np.array([x, y])
val = np.dot(cl_vect - prev_vect, pos_vect - cl_vect)
if val > 0:
closest_idx = (closest_idx + 1) % len(self.waypoints_2d)
return closest_idx
def publish_waypoints(self):
final_lane = self.generate_lane()
self.final_waypoints_pub.publish(final_lane)
def generate_lane(self):
lane = Lane()
closest_idx = self.get_closest_waypoint_idx()
farthest_idx = closest_idx + LOOKAHEAD_WPS
base_waypoints = self.base_lane.waypoints[closest_idx:farthest_idx]
if self.stopline_wp_idx == -1 or (self.stopline_wp_idx >= farthest_idx):
lane.waypoints = base_waypoints
else:
lane.waypoints = self.decelerate_waypoints(base_waypoints, closest_idx)
return lane
def decelerate_waypoints(self, waypoints, closest_idx):
temp = []
for i, wp in enumerate(waypoints):
p = Waypoint()
p.pose = wp.pose
# Two waypoints back from line so front of car stops at line
stop_idx = max(self.stopline_wp_idx - closest_idx - 2, 0)
dist = self.distance(waypoints, i, stop_idx)
vel = math.sqrt(2 * MAX_DECEL * dist)
if vel < 1.:
vel = 0.
p.twist.twist.linear.x = min(vel, wp.twist.twist.linear.x)
temp.append(p)
return temp
def pose_cb(self, msg):
self.pose = msg
def waypoints_cb(self, waypoints):
self.base_lane = waypoints
if not self.waypoints_2d:
self.waypoints_2d = [
[waypoint.pose.pose.position.x, waypoint.pose.pose.position.y]
for waypoint in waypoints.waypoints
]
self.waypoint_tree = KDTree(self.waypoints_2d)
def traffic_cb(self, msg):
# Callback for /traffic_waypoint message. Implement
self.stopline_wp_idx = msg.data
def obstacle_cb(self, msg):
# TODO: Callback for /obstacle_waypoint message. We will implement it later
pass
def get_waypoint_velocity(self, waypoint):
return waypoint.twist.twist.linear.x
def set_waypoint_velocity(self, waypoints, waypoint, velocity):
waypoints[waypoint].twist.twist.linear.x = velocity
def distance(self, waypoints, wp1, wp2):
dist = 0
dl = lambda a, b: math.sqrt((a.x - b.x) ** 2 + (a.y - b.y) ** 2 + (a.z - b.z) ** 2)
for i in range(wp1, wp2 + 1):
dist += dl(waypoints[wp1].pose.pose.position, waypoints[i].pose.pose.position)
wp1 = i
return dist
if __name__ == '__main__':
try:
WaypointUpdater()
except rospy.ROSInterruptException:
rospy.logerr('Could not start waypoint updater node.')