forked from udacity/CarND-Capstone
-
Notifications
You must be signed in to change notification settings - Fork 3
/
dbw_node.py
executable file
·142 lines (113 loc) · 5.83 KB
/
dbw_node.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
#!/usr/bin/env python
import rospy
from std_msgs.msg import Bool
from dbw_mkz_msgs.msg import ThrottleCmd, SteeringCmd, BrakeCmd, SteeringReport
from geometry_msgs.msg import TwistStamped
import math
from twist_controller_mod import Controller
'''
DBW node for controlling throttle, brake, and steering.
Partially inspired by DataSpeed DBW controller:
https://bitbucket.org/DataspeedInc/dbw_mkz_ros/raw/ecfb61cbad05e06d5de0d6a971c63c955230a982/README.pdf
'''
class DBWNode(object):
def __init__(self):
rospy.init_node('dbw_node')
vehicle_mass = rospy.get_param('~vehicle_mass', 1736.35)
fuel_capacity = rospy.get_param('~fuel_capacity', 13.5)
brake_deadband = rospy.get_param('~brake_deadband', .1)
decel_limit = rospy.get_param('~decel_limit', -5)
accel_limit = rospy.get_param('~accel_limit', 1.)
wheel_radius = rospy.get_param('~wheel_radius', 0.2413)
wheel_base = rospy.get_param('~wheel_base', 2.8498)
steer_ratio = rospy.get_param('~steer_ratio', 14.8)
max_lat_accel = rospy.get_param('~max_lat_accel', 3.)
max_steer_angle = rospy.get_param('~max_steer_angle', 8.)
throttle_limit = rospy.get_param('~throttle_limit', 0.2)
dyn_velo_proportional_control = rospy.get_param(
'~dyn_velo_proportional_control', 0.2)
dyn_velo_integral_control = rospy.get_param(
'~dyn_velo_integral_control', 0.01)
dyn_braking_proportional_control = rospy.get_param(
'~dyn_braking_proportional_control', 200)
dyn_braking_integral_control = rospy.get_param(
'~dyn_braking_integral_control', 0)
# TODO - define min_speed as a parameter in the parameter server?
min_speed = 0.
self.steer_pub = rospy.Publisher('/vehicle/steering_cmd',
SteeringCmd, queue_size=1)
self.throttle_pub = rospy.Publisher('/vehicle/throttle_cmd',
ThrottleCmd, queue_size=1)
self.brake_pub = rospy.Publisher('/vehicle/brake_cmd',
BrakeCmd, queue_size=1)
self.target_linear_velocity = None
self.target_angular_velocity = None
self.current_linear_velocity = None
self.is_decelerating = False
self.actual_linear_velocity = 0.0
self.update_rate_hz = 50.0
# assume drive by wire is disabled to begin
self.dbw_enabled = False
# create low pass filter for measuring acceleration
default_update_rate = 1.0 / self.update_rate_hz
# Create `Controller` object
self.controller = Controller(
default_update_rate, wheel_base, steer_ratio, min_speed, max_lat_accel, max_steer_angle, decel_limit, throttle_limit, fuel_capacity, vehicle_mass, wheel_radius, dyn_velo_proportional_control, dyn_velo_integral_control, dyn_braking_proportional_control, dyn_braking_integral_control)
# Subscribe to messages about car being under drive by wire control
rospy.Subscriber(
'/vehicle/dbw_enabled', Bool, self.handleDBWEnabledMessage)
# Subscribe to messages from waypoint follower
rospy.Subscriber(
'/twist_cmd', TwistStamped, self.handle_target_velocity_message)
# Subscribe to steering report from vehicle
rospy.Subscriber(
'/current_velocity', TwistStamped, self.handle_steering_report)
# Subscribe to is_decelerating message from waypoint updater
rospy.Subscriber(
'/is_decelerating', Bool, self.handle_is_decelerating)
self.loop()
def loop(self):
rate = rospy.Rate(self.update_rate_hz)
while not rospy.is_shutdown():
# Get predicted throttle, brake, and steering
if (self.can_predict_controls()):
throttle, brake, steering = self.controller.control(self.target_linear_velocity,
self.target_angular_velocity,
self.actual_linear_velocity,
self.is_decelerating)
self.publish(throttle, brake, steering)
rate.sleep()
def publish(self, throttle, brake, steer):
tcmd = ThrottleCmd()
tcmd.enable = True
tcmd.pedal_cmd_type = ThrottleCmd.CMD_PERCENT
tcmd.pedal_cmd = throttle
self.throttle_pub.publish(tcmd)
scmd = SteeringCmd()
scmd.enable = True
scmd.steering_wheel_angle_cmd = steer
self.steer_pub.publish(scmd)
bcmd = BrakeCmd()
bcmd.enable = True
bcmd.pedal_cmd_type = BrakeCmd.CMD_TORQUE
bcmd.pedal_cmd = brake
self.brake_pub.publish(bcmd)
def handleDBWEnabledMessage(self, dbw_enabled_message):
self.dbw_enabled = dbw_enabled_message.data
rospy.loginfo(
'Drive by Wire %s', "enabled" if self.dbw_enabled else "disabled")
if (not self.dbw_enabled):
self.controller.reset()
def handle_is_decelerating(self, is_decelerating_message):
self.is_decelerating = is_decelerating_message.data
def handle_target_velocity_message(self, twist_velocity_command_message):
twist = twist_velocity_command_message.twist
self.target_linear_velocity = twist.linear.x
self.target_angular_velocity = twist.angular.z
def handle_steering_report(self, twist_current_velocity_message):
# record actual linear vehicle velocity
self.actual_linear_velocity = twist_current_velocity_message.twist.linear.x
def can_predict_controls(self):
return self.dbw_enabled and self.target_linear_velocity is not None and self.target_angular_velocity is not None and self.actual_linear_velocity is not None
if __name__ == '__main__':
DBWNode()