forked from ArduPilot/ardupilot
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
325f686
commit e97c4fb
Showing
1 changed file
with
220 additions
and
0 deletions.
There are no files selected for viewing
220 changes: 220 additions & 0 deletions
220
Tools/ros2/ardupilot_dds_tests/test/ardupilot_dds_tests/test_joy_msg_received.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,220 @@ | ||
#!/usr/bin/env python3 | ||
# Copyright 2023 ArduPilot.org. | ||
# | ||
# This program is free software: you can redistribute it and/or modify | ||
# it under the terms of the GNU General Public License as published by | ||
# the Free Software Foundation, either version 3 of the License, or | ||
# (at your option) any later version. | ||
# | ||
# This program is distributed in the hope that it will be useful, | ||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
# GNU General Public License for more details. | ||
# | ||
# You should have received a copy of the GNU General Public License | ||
# along with this program. If not, see <https://www.gnu.org/licenses/>. | ||
|
||
""" | ||
Bring up ArduPilot SITL and check the Joy message is being published. | ||
colcon test --packages-select ardupilot_dds_tests \ | ||
--event-handlers=console_cohesion+ --pytest-args -k test_joy_msg_received | ||
""" | ||
|
||
import rclpy | ||
import time | ||
import launch_pytest | ||
from rclpy.node import Node | ||
from builtin_interfaces.msg import Time | ||
from geographic_msgs.msg import GeoPoseStamped | ||
from geopy import point | ||
from ardupilot_msgs.srv import ArmMotors | ||
from ardupilot_msgs.srv import ModeSwitch | ||
from sensor_msgs.msg import Joy | ||
from scipy.spatial.transform import Rotation as R | ||
import pytest | ||
from launch_pytest.tools import process as process_tools | ||
from launch import LaunchDescription | ||
import threading | ||
|
||
PLANE_MODE_TAKEOFF = 13 | ||
PLANE_MODE_GUIDED = 15 | ||
PLANE_MODE_FBWB = 6 | ||
GUIDED = 4 | ||
LOITER = 5 | ||
|
||
FRAME_GLOBAL_INT = 5 | ||
|
||
# Hard code some known locations | ||
# Note - Altitude in geopy is in km! | ||
GRAYHOUND_TRACK = point.Point(latitude=-35.345996, longitude=149.159017, altitude=0.575) | ||
CMAC = point.Point(latitude=-35.3627010, longitude=149.1651513, altitude=0.585) | ||
|
||
|
||
class PlaneFbwbJoyControl(Node): | ||
"""Plane follow waypoints using guided control.""" | ||
|
||
def __init__(self): | ||
"""Initialise the node.""" | ||
super().__init__("copter_joy_listener") | ||
|
||
# self.climbing_event_object = threading.Event() | ||
# self.moving_event_object = threading.Event() | ||
|
||
self._client_arm = self.create_client(ArmMotors, "/ap/arm_motors") | ||
while not self._client_arm.wait_for_service(timeout_sec=1.0): | ||
self.get_logger().info('arm service not available, waiting again...') | ||
|
||
self._client_mode_switch = self.create_client(ModeSwitch, "/ap/mode_switch") | ||
while not self._client_mode_switch.wait_for_service(timeout_sec=1.0): | ||
self.get_logger().info('mode switch service not available, waiting again...') | ||
|
||
self._joy_pub = self.create_publisher(Joy, "/ap/joy", 1) | ||
|
||
# Create subscriptions after services are up | ||
qos = rclpy.qos.QoSProfile( | ||
reliability=rclpy.qos.ReliabilityPolicy.BEST_EFFORT, durability=rclpy.qos.DurabilityPolicy.VOLATILE, depth=1 | ||
) | ||
self._subscription_geopose = self.create_subscription(GeoPoseStamped, "/ap/geopose/filtered", self.geopose_cb, qos) | ||
self._cur_geopose = GeoPoseStamped() | ||
|
||
# Add a spin thread. | ||
# self.ros_spin_thread = threading.Thread(target=lambda node: rclpy.spin(node), args=(self,)) | ||
# self.ros_spin_thread.start() | ||
|
||
def geopose_cb(self, msg: GeoPoseStamped): | ||
"""Process a GeoPose message.""" | ||
stamp = msg.header.stamp | ||
if stamp.sec: | ||
self.get_logger().info("From AP : Geopose [sec:{}, nsec: {}]".format(stamp.sec, stamp.nanosec)) | ||
|
||
# Store current state | ||
self._cur_geopose = msg | ||
|
||
def arm(self): | ||
req = ArmMotors.Request() | ||
req.arm = True | ||
future = self._client_arm.call_async(req) | ||
rclpy.spin_until_future_complete(self, future) | ||
return future.result() | ||
|
||
def arm_with_timeout(self, timeout: rclpy.duration.Duration): | ||
"""Try to arm. Returns true on success, or false if arming fails or times out.""" | ||
armed = False | ||
start = self.get_clock().now() | ||
while not armed and self.get_clock().now() - start < timeout: | ||
armed = self.arm().result | ||
time.sleep(1) | ||
return armed | ||
|
||
def switch_mode(self, mode): | ||
req = ModeSwitch.Request() | ||
assert mode in [GUIDED, LOITER] | ||
req.mode = mode | ||
future = self._client_mode_switch.call_async(req) | ||
rclpy.spin_until_future_complete(self, future) | ||
return future.result() | ||
|
||
def switch_mode_with_timeout(self, desired_mode: int, timeout: rclpy.duration.Duration): | ||
"""Try to switch mode. Returns true on success, or false if mode switch fails or times out.""" | ||
is_in_desired_mode = False | ||
start = self.get_clock().now() | ||
while not is_in_desired_mode: | ||
result = self.switch_mode(desired_mode) | ||
# Handle successful switch or the case that the vehicle is already in expected mode | ||
is_in_desired_mode = result.status or result.curr_mode == desired_mode | ||
time.sleep(1) | ||
|
||
return is_in_desired_mode | ||
|
||
def get_cur_geopose(self): | ||
"""Return latest geopose.""" | ||
return self._cur_geopose | ||
|
||
def send_goal_position(self, goal_global_pos): | ||
"""Send goal position. Must be in guided for this to work.""" | ||
self._global_pos_pub.publish(goal_global_pos) | ||
|
||
def send_joy_value(self, joy_val): | ||
self._joy_pub.publish(joy_val) | ||
|
||
def ros_quat_to_heading_deg(quat): | ||
# By default, scipy follows scalar-last order – (x, y, z, w) | ||
rot = R.from_quat([quat.x, quat.y, quat.z, quat.w]) | ||
r, p, y = rot.as_euler(seq="xyz", degrees=True) | ||
return y | ||
|
||
def angle_diff_deg(a, b): | ||
diff = a - b | ||
diff = (diff + 180) % 360 - 180 | ||
return diff | ||
|
||
|
||
@launch_pytest.fixture | ||
def launch_sitl_copter_dds_serial(sitl_copter_dds_serial): | ||
"""Fixture to create the launch description.""" | ||
sitl_ld, sitl_actions = sitl_copter_dds_serial | ||
|
||
ld = LaunchDescription( | ||
[ | ||
sitl_ld, | ||
launch_pytest.actions.ReadyToTest(), | ||
] | ||
) | ||
actions = sitl_actions | ||
yield ld, actions | ||
|
||
|
||
@pytest.mark.launch(fixture=launch_sitl_copter_dds_serial) | ||
def test_dds_joy_msg_recv(launch_context, launch_sitl_copter_dds_serial): | ||
"""Test position messages are published by AP_DDS.""" | ||
_, actions = launch_sitl_copter_dds_serial | ||
virtual_ports = actions["virtual_ports"].action | ||
micro_ros_agent = actions["micro_ros_agent"].action | ||
mavproxy = actions["mavproxy"].action | ||
sitl = actions["sitl"].action | ||
|
||
# Wait for process to start. | ||
process_tools.wait_for_start_sync(launch_context, virtual_ports, timeout=2) | ||
process_tools.wait_for_start_sync(launch_context, micro_ros_agent, timeout=2) | ||
process_tools.wait_for_start_sync(launch_context, mavproxy, timeout=2) | ||
process_tools.wait_for_start_sync(launch_context, sitl, timeout=2) | ||
|
||
rclpy.init() | ||
|
||
try: | ||
node = PlaneFbwbJoyControl() | ||
|
||
# Block till armed, which will wait for EKF3 to initialize | ||
assert node.arm_with_timeout(rclpy.duration.Duration(seconds=30)) | ||
|
||
# Block till in takeoff | ||
assert node.switch_mode_with_timeout(LOITER, rclpy.duration.Duration(seconds=20)) | ||
|
||
joy_msg = Joy() | ||
joy_msg.axes.append(0.0) | ||
joy_msg.axes.append(0.0) | ||
joy_msg.axes.append(1.0) #- straight up | ||
joy_msg.axes.append(0.0) | ||
|
||
is_ascending_to_takeoff_alt = True | ||
initial_altitude = node.get_cur_geopose().pose.position.altitude | ||
start = node.get_clock().now() | ||
while is_ascending_to_takeoff_alt and node.get_clock().now() - start < rclpy.duration.Duration(seconds=10): | ||
node.send_joy_value(joy_msg) | ||
time.sleep(0.2) | ||
current_altitude = node.get_cur_geopose().pose.position.altitude | ||
# print (current_altitude) | ||
is_ascending_to_takeoff_alt = current_altitude - initial_altitude < 10 | ||
# rclpy.spin_once(node) | ||
|
||
# assert (not is_ascending_to_takeoff_alt), "Failed to reach takeoff altitude" | ||
|
||
assert True | ||
|
||
|
||
finally: | ||
rclpy.shutdown() | ||
yield | ||
|