diff --git a/realsense2_camera/CMakeLists.txt b/realsense2_camera/CMakeLists.txt
index fd06cb4868..67361a948c 100644
--- a/realsense2_camera/CMakeLists.txt
+++ b/realsense2_camera/CMakeLists.txt
@@ -304,6 +304,36 @@ if(BUILD_TESTING)
)
endforeach()
endforeach()
+
+ unset(_pytest_folders)
+
+ set(rs_query_cmd "rs-enumerate-devices -s")
+ execute_process(COMMAND bash -c ${rs_query_cmd}
+ WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}
+ RESULT_VARIABLE rs_result
+ OUTPUT_VARIABLE RS_DEVICE_INFO)
+ message(STATUS "rs_device_info:")
+ message(STATUS "${RS_DEVICE_INFO}")
+ if((RS_DEVICE_INFO MATCHES "D455") OR (RS_DEVICE_INFO MATCHES "D415") OR (RS_DEVICE_INFO MATCHES "D435"))
+ message(STATUS "D455 device found")
+ set(_pytest_live_folders
+ test/live_camera
+ )
+ endif()
+
+ foreach(test_folder ${_pytest_live_folders})
+ file(GLOB files "${test_folder}/test_*.py")
+ foreach(file ${files})
+
+ get_filename_component(_test_name ${file} NAME_WE)
+ ament_add_pytest_test(${_test_name} ${file}
+ APPEND_ENV PYTHONPATH=${CMAKE_CURRENT_BINARY_DIR}:${CMAKE_SOURCE_DIR}/test/utils:${CMAKE_SOURCE_DIR}/launch:${CMAKE_SOURCE_DIR}/scripts
+ TIMEOUT 500
+ WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}
+ )
+ endforeach()
+ endforeach()
+
endif()
# Ament exports
diff --git a/realsense2_camera/package.xml b/realsense2_camera/package.xml
index ab4d1d763b..6db46cff68 100644
--- a/realsense2_camera/package.xml
+++ b/realsense2_camera/package.xml
@@ -38,7 +38,8 @@
sensor_msgs_py
python3-requests
tf2_ros_py
-
+ ros2topic
+
launch_ros
ros_environment
diff --git a/realsense2_camera/test/README.md b/realsense2_camera/test/README.md
index 0be7600c24..ea3a043a92 100644
--- a/realsense2_camera/test/README.md
+++ b/realsense2_camera/test/README.md
@@ -146,7 +146,32 @@ The below command finds the test with the name test_static_tf_1 in realsense2_ca
pytest-3 -s -k test_static_tf_1 realsense2_camera/test/
-### Points to be noted while writing pytests
-The tests that are in one file are nromally run in parallel. So if there are multiple tests in one file, the system capacity can influence the test execution. It's recomended to have 3-4 tests in file, more than that can affect the test results due to delays.
-
+### Marking tests as regression tests
+Some of the tests, especially the live tests with multiple runs, for e.g., all profile tests (test_camera_all_profile_tests.py) take a long time. Such tests are marked are skipped with condition so that "colcon test" skips it.
+If a user wants to add a test to this conditional skip, user can add by adding a marker as below.
+
+@pytest.mark.skipif (os.getenv('RS_ROS_REGRESSION', "not found") == "not found",reason="Regression is not enabled, define RS_ROS_REGRESSION")
+
+### Running skipped regression tests
+1. Set the environment variable RS_ROS_REGRESSION as 1 and run the "colcon test"
+2. pytest example:
+ RS_ROS_REGRESSION=1 PYTHONPATH=$PYTHONPATH:$PWD/realsense2_camera/test/utils:$PWD/realsense2_camera//launch:$PWD/realsense2_camera//scripts pytest-3 -s realsense2_camera/test/live_camera/test_camera_aligned_tests.py -k test_camera_align_depth_color_all -m d415
+
+## Points to be noted while writing pytests
+The tests that are in one file are normally run in parallel, there could also be changes in the pytest plugin. So if there are multiple tests in one file, the system capacity can influence the test execution. It's recomended to have 3-4 tests in file, more than that can affect the test results due to delays.
+### Passing/changing parameters
+The parameters passed while creating the node can be initialized individually for each test, please see the test_parameterized_template example for reference. The default values are taken from rs_launch.py and the passed parameters are used for overriding the default values. The parameters that can be dynamically modified can be changed using the param interface provided. However, the function create_param_ifs has to be called to create this interface. Please see the test_d455_basic_tests.py for reference. There are specific functions to change the string, integer and bool parameters, the utils can be extended if any more types are needed.
+### Difference in setting the bool parameters
+There is a difference between setting the default values of bool parameters and setting them dynamically.
+The bool test params are assinged withn quotes as below.
+ test_params_all_profiles_d455 = {
+ 'camera_name': 'D455',
+ 'device_type': 'D455',
+ 'enable_accel':"True",
+ 'enable_gyro':"True",
+ 'unite_imu_method':1,
+ }
+
+However the function that implements the setting of bool parameter dynamically takes the python bool datatype. For example:
+ self.set_bool_param('enable_accel', False)
diff --git a/realsense2_camera/test/live_camera/test_camera_aligned_tests.py b/realsense2_camera/test/live_camera/test_camera_aligned_tests.py
new file mode 100644
index 0000000000..0c17e76e47
--- /dev/null
+++ b/realsense2_camera/test/live_camera/test_camera_aligned_tests.py
@@ -0,0 +1,279 @@
+# Copyright 2023 Intel Corporation. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import sys
+import itertools
+
+
+import pytest
+import rclpy
+
+from sensor_msgs.msg import Image as msg_Image
+from sensor_msgs.msg import Imu as msg_Imu
+from sensor_msgs.msg import PointCloud2 as msg_PointCloud2
+
+import numpy as np
+
+sys.path.append(os.path.abspath(os.path.dirname(__file__)+"/../utils"))
+import pytest_rs_utils
+from pytest_rs_utils import launch_descr_with_parameters
+from pytest_rs_utils import delayed_launch_descr_with_parameters
+from pytest_rs_utils import get_rosbag_file_path
+from pytest_rs_utils import get_node_heirarchy
+import pytest_live_camera_utils
+from rclpy.parameter import Parameter
+from rcl_interfaces.msg import ParameterValue
+from rcl_interfaces.srv import SetParameters, GetParameters, ListParameters
+from pytest_live_camera_utils import debug_print
+
+
+
+test_params_align_depth_color_d455 = {
+ 'camera_name': 'D455',
+ 'device_type': 'D455',
+ 'enable_color':'true',
+ 'enable_depth':'true',
+ 'depth_module.profile':'848x480x30',
+ 'rgb_camera.profile':'640x480x30',
+ 'align_depth.enable':'true'
+ }
+test_params_align_depth_color_d415 = {
+ 'camera_name': 'D415',
+ 'device_type': 'D415',
+ 'enable_color':'true',
+ 'enable_depth':'true',
+ 'depth_module.profile':'848x480x30',
+ 'rgb_camera.profile':'640x480x30',
+ 'align_depth.enable':'true'
+ }
+'''
+This test was implemented as a template to set the parameters and run the test.
+This directory is not added to the CMakeLists.txt so as to avoid the colcon failure in the
+machines that don't have the D455 connected.
+1. Only a subset of parameter types are implemented in py_rs_utils, it has to be extended for others
+2. After setting the param, rclpy.spin_once may be needed.Test passes even without this though.
+'''
+
+@pytest.mark.parametrize("launch_descr_with_parameters",[
+ pytest.param(test_params_align_depth_color_d455, marks=pytest.mark.d455),
+ pytest.param(test_params_align_depth_color_d415, marks=pytest.mark.d415),
+ #pytest.param(test_params_align_depth_color_d435, marks=pytest.mark.d435),
+ ]
+ ,indirect=True)
+@pytest.mark.launch(fixture=launch_descr_with_parameters)
+class TestCamera_AlignDepthColor(pytest_rs_utils.RsTestBaseClass):
+ def test_camera_align_depth_color(self,launch_descr_with_parameters):
+ params = launch_descr_with_parameters[1]
+ if pytest_live_camera_utils.check_if_camera_connected(params['device_type']) == False:
+ print("Device not found? : " + params['device_type'])
+ return
+ themes = [
+ {'topic':get_node_heirarchy(params)+'/color/image_raw',
+ 'msg_type':msg_Image,
+ 'expected_data_chunks':1,
+ 'width':640,
+ 'height':480,
+ },
+ {'topic':get_node_heirarchy(params)+'/depth/image_rect_raw',
+ 'msg_type':msg_Image,
+ 'expected_data_chunks':1,
+ 'width':848,
+ 'height':480,
+ },
+ {'topic':get_node_heirarchy(params)+'/aligned_depth_to_color/image_raw',
+ 'msg_type':msg_Image,
+ 'expected_data_chunks':1,
+ 'width':640,
+ 'height':480,
+ },
+ ]
+ try:
+ '''
+ initialize, run and check the data
+ '''
+ print("Starting camera test...")
+ self.init_test("RsTest"+params['camera_name'])
+ self.wait_for_node(params['camera_name'])
+ self.create_param_ifs(get_node_heirarchy(params))
+ ret = self.run_test(themes)
+ assert ret[0], ret[1]
+ assert self.process_data(themes)
+ self.set_string_param('rgb_camera.profile', '1280x720x30')
+ self.set_bool_param('enable_color', True)
+ themes[0]['width'] = 1280
+ themes[0]['height'] = 720
+ themes[2]['width'] = 1280
+ themes[2]['height'] = 720
+
+ ret = self.run_test(themes)
+ assert ret[0], ret[1]
+ assert self.process_data(themes)
+ finally:
+ #this step is important because the test will fail next time
+ pytest_rs_utils.kill_realsense2_camera_node()
+ self.shutdown()
+
+test_params_all_profiles_d455 = {
+ 'camera_name': 'D455',
+ 'device_type': 'D455',
+ 'enable_color':'true',
+ 'enable_depth':'true',
+ 'depth_module.profile':'848x480x30',
+ 'rgb_camera.profile':'640x480x30',
+ 'align_depth.enable':'true'
+ }
+test_params_all_profiles_d415 = {
+ 'camera_name': 'D415',
+ 'device_type': 'D415',
+ 'enable_color':'true',
+ 'enable_depth':'true',
+ 'depth_module.profile':'848x480x30',
+ 'rgb_camera.profile':'640x480x30',
+ 'align_depth.enable':'true'
+ }
+test_params_all_profiles_d435 = {
+ 'camera_name': 'D435',
+ 'device_type': 'D435',
+ 'enable_color':'true',
+ 'enable_depth':'true',
+ 'depth_module.profile':'848x480x30',
+ 'rgb_camera.profile':'640x480x30',
+ 'align_depth.enable':'true'
+ }
+
+
+'''
+This test was implemented as a template to set the parameters and run the test.
+This directory is not added to the CMakeLists.txt so as to avoid the colcon failure in the
+machines that don't have the D455 connected.
+1. Only a subset of parameter types are implemented in py_rs_utils, it has to be extended for others
+2. After setting the param, rclpy.spin_once may be needed.Test passes even without this though.
+'''
+@pytest.mark.skipif (os.getenv('RS_ROS_REGRESSION', "not found") == "not found",reason="Regression is not enabled, define RS_ROS_REGRESSION")
+@pytest.mark.parametrize("launch_descr_with_parameters", [
+ pytest.param(test_params_all_profiles_d455, marks=pytest.mark.d455),
+ pytest.param(test_params_all_profiles_d415, marks=pytest.mark.d415),
+ pytest.param(test_params_all_profiles_d435, marks=pytest.mark.d435),]
+ ,indirect=True)
+@pytest.mark.launch(fixture=launch_descr_with_parameters)
+class TestCamera_AllAlignDepthColor(pytest_rs_utils.RsTestBaseClass):
+ def test_camera_all_align_depth_color(self,launch_descr_with_parameters):
+ skipped_tests = []
+ failed_tests = []
+ num_passed = 0
+ num_failed = 0
+ params = launch_descr_with_parameters[1]
+ if pytest_live_camera_utils.check_if_camera_connected(params['device_type']) == False:
+ print("Device not found? : " + params['device_type'])
+ return
+ themes = [
+ {'topic':get_node_heirarchy(params)+'/color/image_raw',
+ 'msg_type':msg_Image,
+ 'expected_data_chunks':1,
+ 'width':640,
+ 'height':480,
+ },
+ {'topic':get_node_heirarchy(params)+'/depth/image_rect_raw',
+ 'msg_type':msg_Image,
+ 'expected_data_chunks':1,
+ 'width':848,
+ 'height':480,
+ },
+ {'topic':get_node_heirarchy(params)+'/aligned_depth_to_color/image_raw',
+ 'msg_type':msg_Image,
+ 'expected_data_chunks':1,
+ 'width':640,
+ 'height':480,
+ },
+ ]
+ try:
+ '''
+ initialize, run and check the data
+ '''
+ self.init_test("RsTest"+params['camera_name'])
+ self.spin_for_time(wait_time=1.0)
+ cap = pytest_live_camera_utils.get_camera_capabilities(params['device_type'])
+ if cap == None:
+ debug_print("Device not found? : " + params['device_type'])
+ return
+ self.create_param_ifs(get_node_heirarchy(params))
+ color_profiles = set([i[1] for i in cap["color_profile"] if i[2] == "RGB8"])
+ depth_profiles = set([i[1] for i in cap["depth_profile"] if i[0] == "Depth"])
+ for color_profile in color_profiles:
+ for depth_profile in depth_profiles:
+ if depth_profile == color_profile:
+ continue
+ print("Testing the alignment of Depth:", depth_profile, " and Color:", color_profile)
+ self.set_bool_param('enable_color', False)
+ self.set_bool_param('enable_color', False)
+ self.set_bool_param('align_depth.enable', False)
+
+ themes[0]['width'] = themes[2]['width'] = int(color_profile.split('x')[0])
+ themes[0]['height'] = themes[2]['height'] = int(color_profile.split('x')[1])
+ themes[1]['width'] = int(depth_profile.split('x')[0])
+ themes[1]['height'] = int(depth_profile.split('x')[1])
+ dfps = int(depth_profile.split('x')[2])
+ cfps = int(color_profile.split('x')[2])
+ if dfps > cfps:
+ fps = cfps
+ else:
+ fps = dfps
+ timeout=100.0/fps
+ #for the changes to take effect
+ self.spin_for_time(wait_time=timeout/20)
+ self.set_string_param('rgb_camera.profile', color_profile)
+ self.set_string_param('depth_module.profile', depth_profile)
+ self.set_bool_param('enable_color', True)
+ self.set_bool_param('enable_color', True)
+ self.set_bool_param('align_depth.enable', True)
+ #for the changes to take effect
+ self.spin_for_time(wait_time=timeout/20)
+ try:
+ ret = self.run_test(themes, timeout=timeout)
+ assert ret[0], ret[1]
+ assert self.process_data(themes), "".join(str(depth_profile) + " " + str(color_profile)) + " failed"
+ num_passed += 1
+ except Exception as e:
+ exc_type, exc_obj, exc_tb = sys.exc_info()
+ fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
+ print("Test failed")
+ print("Tested the alignment of Depth:", depth_profile, " and Color:", color_profile, " with timeout ", timeout)
+ print(e)
+ print(exc_type, fname, exc_tb.tb_lineno)
+ num_failed += 1
+ failed_tests.append("".join(str(depth_profile) + " " + str(color_profile)))
+
+ finally:
+ #this step is important because the test will fail next time
+ pytest_rs_utils.kill_realsense2_camera_node()
+ self.shutdown()
+ print("Tests passed " + str(num_passed))
+ print("Tests skipped " + str(len(skipped_tests)))
+ if len(skipped_tests):
+ debug_print("\nSkipped tests:" + params['device_type'])
+ debug_print("\n".join(skipped_tests))
+ print("Tests failed " + str(num_failed))
+ if len(failed_tests):
+ print("\nFailed tests:" + params['device_type'])
+ print("\n".join(failed_tests))
+
+ def disable_all_params(self):
+ self.set_bool_param('enable_color', False)
+ self.set_bool_param('enable_depth', False)
+ self.set_bool_param('enable_infra', False)
+ self.set_bool_param('enable_infra1', False)
+ self.set_bool_param('enable_infra2', False)
+
diff --git a/realsense2_camera/test/live_camera/test_camera_all_profile_tests.py b/realsense2_camera/test/live_camera/test_camera_all_profile_tests.py
new file mode 100644
index 0000000000..b337e6f8d5
--- /dev/null
+++ b/realsense2_camera/test/live_camera/test_camera_all_profile_tests.py
@@ -0,0 +1,265 @@
+# Copyright 2023 Intel Corporation. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import sys
+import itertools
+
+
+import pytest
+import rclpy
+
+from sensor_msgs.msg import Image as msg_Image
+from sensor_msgs.msg import Imu as msg_Imu
+from sensor_msgs.msg import PointCloud2 as msg_PointCloud2
+
+import numpy as np
+
+sys.path.append(os.path.abspath(os.path.dirname(__file__)+"/../utils"))
+import pytest_rs_utils
+from pytest_rs_utils import launch_descr_with_parameters
+from pytest_rs_utils import delayed_launch_descr_with_parameters
+from pytest_rs_utils import get_rosbag_file_path
+from pytest_rs_utils import get_node_heirarchy
+import pytest_live_camera_utils
+from rclpy.parameter import Parameter
+from rcl_interfaces.msg import ParameterValue
+from rcl_interfaces.srv import SetParameters, GetParameters, ListParameters
+from pytest_live_camera_utils import debug_print
+def check_if_skip_test(profile, format):
+ '''
+ if profile == 'Color':
+ if "BGRA8" == format:
+ return True
+ if "RGBA8" == format:
+ return True
+ if "Y8" == format:
+ return True
+ elif profile == 'Depth':
+ if "Z16" == format:
+ return True
+ el
+ if profile == 'Infrared':
+ if "Y8" == format:
+ return True
+ if "Y16" == format:
+ return True
+ if "BGRA8" == format:
+ return True
+ if "RGBA8" == format:
+ return True
+ if "Y10BPACK" == format:
+ return True
+ if "UYVY" == format:
+ return True
+ if "BGR8" == format:
+ return True
+ if "RGB8" == format:
+ return True
+ if "RAW10" == format:
+ return True
+ elif profile == 'Infrared1':
+ if "Y8" ==format:
+ return True
+ if "Y16" ==format:
+ return True
+ if "Y10BPACK" == format:
+ return True
+ if "UYVY" ==format:
+ return True
+ if "BGR8" ==format:
+ return True
+ if "RGB8" ==format:
+ return True
+ if "RAW10" ==format:
+ return True
+ if profile == 'Infrared2':
+ if "Y8" == format:
+ return True
+ if "Y16" == format:
+ return True
+ if "Y10BPACK" == format:
+ return True
+ if "UYVY" == format:
+ return True
+ if "BGR8" == format:
+ return True
+ if "RGB8" == format:
+ return True
+ if "RAW10" == format:
+ return True
+ '''
+ if profile == 'Infrared':
+ if "Y8" == format:
+ return True
+ if "Y16" == format:
+ return True
+ if profile == 'Infrared1':
+ if "Y8" ==format:
+ return True
+ if "Y16" ==format:
+ return True
+ if profile == 'Infrared2':
+ if "Y8" == format:
+ return True
+ if "Y16" == format:
+ return True
+ return False
+
+
+test_params_all_profiles_d455 = {
+ 'camera_name': 'D455',
+ 'device_type': 'D455',
+ }
+test_params_all_profiles_d415 = {
+ 'camera_name': 'D415',
+ 'device_type': 'D415',
+ }
+test_params_all_profiles_d435 = {
+ 'camera_name': 'D435',
+ 'device_type': 'D435',
+ }
+
+
+'''
+This test was implemented as a template to set the parameters and run the test.
+This directory is not added to the CMakeLists.txt so as to avoid the colcon failure in the
+machines that don't have the D455 connected.
+1. Only a subset of parameter types are implemented in py_rs_utils, it has to be extended for others
+2. After setting the param, rclpy.spin_once may be needed.Test passes even without this though.
+'''
+@pytest.mark.skipif (os.getenv('RS_ROS_REGRESSION', "not found") == "not found",reason="Regression is not enabled, define RS_ROS_REGRESSION")
+@pytest.mark.parametrize("launch_descr_with_parameters", [
+ pytest.param(test_params_all_profiles_d455, marks=pytest.mark.d455),
+ pytest.param(test_params_all_profiles_d415, marks=pytest.mark.d415),
+ pytest.param(test_params_all_profiles_d435, marks=pytest.mark.d435),]
+ ,indirect=True)
+@pytest.mark.launch(fixture=launch_descr_with_parameters)
+class TestLiveCamera_Change_Resolution(pytest_rs_utils.RsTestBaseClass):
+ def test_LiveCamera_Change_Resolution(self,launch_descr_with_parameters):
+ skipped_tests = []
+ failed_tests = []
+ num_passed = 0
+ num_failed = 0
+ params = launch_descr_with_parameters[1]
+ themes = [{'topic':get_node_heirarchy(params)+'/color/image_raw', 'msg_type':msg_Image,'expected_data_chunks':1}]
+ config = pytest_live_camera_utils.get_profile_config(get_node_heirarchy(params))
+ try:
+ '''
+ initialize, run and check the data
+ '''
+ self.init_test("RsTest"+params['camera_name'])
+ self.spin_for_time(wait_time=1.0)
+ cap = pytest_live_camera_utils.get_camera_capabilities(params['device_type'])
+ if cap == None:
+ debug_print("Device not found? : " + params['device_type'])
+ return
+ self.create_param_ifs(get_node_heirarchy(params))
+ config["Color"]["default_profile1"],config["Color"]["default_profile2"] = pytest_live_camera_utils.get_default_profiles(cap["color_profile"], "Color")
+ config["Depth"]["default_profile1"],config["Depth"]["default_profile2"] = pytest_live_camera_utils.get_default_profiles(cap["depth_profile"], "Depth")
+ config["Infrared"]["default_profile1"],config["Infrared"]["default_profile2"] = pytest_live_camera_utils.get_default_profiles(cap["depth_profile"], "Infrared")
+ config["Infrared1"]["default_profile1"],config["Infrared1"]["default_profile2"] = pytest_live_camera_utils.get_default_profiles(cap["depth_profile"], "Infrared1")
+ config["Infrared2"]["default_profile1"],config["Infrared2"]["default_profile2"] = pytest_live_camera_utils.get_default_profiles(cap["depth_profile"], "Infrared2")
+ for key in cap["color_profile"]:
+ profile_type = key[0]
+ profile = key[1]
+ format = key[2]
+ if check_if_skip_test(profile_type, format):
+ skipped_tests.append(" ".join(key))
+ continue
+ print("Testing " + " ".join(key))
+ themes[0]['topic'] = config[profile_type]['topic']
+ themes[0]['width'] = int(profile.split('x')[0])
+ themes[0]['height'] = int(profile.split('x')[1])
+ if themes[0]['width'] == int(config[profile_type]["default_profile2"].split('x')[0]):
+ self.set_string_param(config[profile_type]["profile"], config[profile_type]["default_profile1"])
+ else:
+ self.set_string_param(config[profile_type]["profile"], config[profile_type]["default_profile2"])
+ self.set_bool_param(config[profile_type]["param"], True)
+ self.disable_all_params()
+ self.spin_for_time(wait_time=0.2)
+ self.set_string_param(config[profile_type]["profile"], profile)
+ self.set_string_param(config[profile_type]["format"], format)
+ self.set_bool_param(config[profile_type]["param"], True)
+ try:
+ ret = self.run_test(themes)
+ assert ret[0], ret[1]
+ assert self.process_data(themes), " ".join(key) + " failed"
+ num_passed += 1
+ except Exception as e:
+ exc_type, exc_obj, exc_tb = sys.exc_info()
+ fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
+ print("Test failed")
+ print(e)
+ print(exc_type, fname, exc_tb.tb_lineno)
+ num_failed += 1
+ failed_tests.append(" ".join(key))
+ debug_print("Color tests completed")
+ for key in cap["depth_profile"]:
+ profile_type = key[0]
+ profile = key[1]
+ format = key[2]
+ if check_if_skip_test(profile_type, format):
+ skipped_tests.append(" ".join(key))
+ continue
+ print("Testing " + " ".join(key))
+
+ themes[0]['topic'] = config[profile_type]['topic']
+ themes[0]['width'] = int(profile.split('x')[0])
+ themes[0]['height'] = int(profile.split('x')[1])
+ if themes[0]['width'] == int(config[profile_type]["default_profile2"].split('x')[0]):
+ self.set_string_param(config[profile_type]["profile"], config[profile_type]["default_profile1"])
+ else:
+ self.set_string_param(config[profile_type]["profile"], config[profile_type]["default_profile2"])
+ self.set_bool_param(config[profile_type]["param"], True)
+
+
+ self.disable_all_params()
+ self.spin_for_time(wait_time=0.2)
+ self.set_string_param(config[profile_type]["profile"], profile)
+ self.set_string_param(config[profile_type]["format"], format)
+ self.set_bool_param(config[profile_type]["param"], True)
+ try:
+ ret = self.run_test(themes)
+ assert ret[0], ret[1]
+ assert self.process_data(themes), " ".join(key) + " failed"
+ num_passed += 1
+ except Exception as e:
+ print("Test failed")
+ print(e)
+ num_failed += 1
+ failed_tests.append(" ".join(key))
+ debug_print("Depth tests completed")
+ finally:
+ #this step is important because the test will fail next time
+ pytest_rs_utils.kill_realsense2_camera_node()
+ self.shutdown()
+ print("Tests passed " + str(num_passed))
+ print("Tests skipped " + str(len(skipped_tests)))
+ if len(skipped_tests):
+ debug_print("\nSkipped tests:" + params['device_type'])
+ debug_print("\n".join(skipped_tests))
+ print("Tests failed " + str(num_failed))
+ if len(failed_tests):
+ print("\nFailed tests:" + params['device_type'])
+ print("\n".join(failed_tests))
+
+ def disable_all_params(self):
+ self.set_bool_param('enable_color', False)
+ self.set_bool_param('enable_depth', False)
+ self.set_bool_param('enable_infra', False)
+ self.set_bool_param('enable_infra1', False)
+ self.set_bool_param('enable_infra2', False)
+
diff --git a/realsense2_camera/test/live_camera/test_camera_fps_tests.py b/realsense2_camera/test/live_camera/test_camera_fps_tests.py
new file mode 100644
index 0000000000..c4b91354d4
--- /dev/null
+++ b/realsense2_camera/test/live_camera/test_camera_fps_tests.py
@@ -0,0 +1,128 @@
+# Copyright 2023 Intel Corporation. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import sys
+import itertools
+
+
+import pytest
+import rclpy
+
+from sensor_msgs.msg import Image as msg_Image
+from sensor_msgs.msg import Imu as msg_Imu
+from sensor_msgs.msg import PointCloud2 as msg_PointCloud2
+
+import numpy as np
+
+sys.path.append(os.path.abspath(os.path.dirname(__file__)+"/../utils"))
+import pytest_rs_utils
+import pytest_live_camera_utils
+from pytest_rs_utils import launch_descr_with_parameters
+
+from pytest_rs_utils import get_rosbag_file_path
+from pytest_rs_utils import get_node_heirarchy
+
+from rclpy.parameter import Parameter
+from rcl_interfaces.msg import ParameterValue
+from rcl_interfaces.srv import SetParameters, GetParameters, ListParameters
+
+test_params_test_fps_d455 = {
+ 'camera_name': 'D455',
+ 'device_type': 'D455',
+
+ }
+test_params_test_fps_d415 = {
+ 'camera_name': 'D415',
+ 'device_type': 'D415',
+ }
+test_profiles ={
+ 'D455':{
+ 'depth_test_profiles':['640x480x5','640x480x15', '640x480x30', '640x480x90'],
+ 'color_test_profiles':['640x480x5','640x480x15', '640x480x30', '1280x720x30']
+ },
+ 'D415':{
+ 'depth_test_profiles':['640x480x6','640x480x15', '640x480x30', '640x480x90'],
+ 'color_test_profiles':['640x480x6','640x480x15', '640x480x30', '1280x720x30']
+ }
+}
+'''
+The test was implemented to check the fps of Depth and Color frames. The RosTopicHz had to be
+modified to make it work, see py_rs_utils for more details.
+To check the fps, a value 'expected_fps_in_hz' has to be added to the corresponding theme
+'''
+@pytest.mark.parametrize("launch_descr_with_parameters", [
+ pytest.param(test_params_test_fps_d455, marks=pytest.mark.d455),
+ pytest.param(test_params_test_fps_d415, marks=pytest.mark.d415),
+ #pytest.param(test_params_test_fps_d435, marks=pytest.mark.d435),
+ ],indirect=True)
+@pytest.mark.launch(fixture=launch_descr_with_parameters)
+class TestCamera_TestFPS(pytest_rs_utils.RsTestBaseClass):
+ def test_camera_test_fps(self,launch_descr_with_parameters):
+ params = launch_descr_with_parameters[1]
+ if pytest_live_camera_utils.check_if_camera_connected(params['device_type']) == False:
+ print("Device not found? : " + params['device_type'])
+ return
+ try:
+ '''
+ initialize, run and check the data
+ '''
+ print("Starting camera test...")
+ self.init_test("RsTest"+params['camera_name'])
+ self.wait_for_node(params['camera_name'])
+ self.create_param_ifs(get_node_heirarchy(params))
+ #assert self.set_bool_param('enable_color', False)
+
+ themes = [
+ {'topic':get_node_heirarchy(params)+'/depth/image_rect_raw',
+ 'msg_type':msg_Image,
+ 'expected_data_chunks':100,
+ }
+ ]
+ profiles = test_profiles[params['camera_name']]['depth_test_profiles']
+ assert self.set_bool_param('enable_color', False)
+ for profile in profiles:
+ print("Testing profile: ", profile)
+ themes[0]['expected_fps_in_hz'] = float(profile.split('x')[2])
+ assert self.set_string_param('depth_module.profile', profile)
+ assert self.set_bool_param('enable_depth', True)
+ self.spin_for_time(0.5)
+ ret = self.run_test(themes, timeout=25.0)
+ assert ret[0], ret[1]
+ assert self.process_data(themes)
+
+ themes = [
+ {'topic':get_node_heirarchy(params)+'/color/image_raw',
+ 'msg_type':msg_Image,
+ 'expected_data_chunks':100,
+ }
+ ]
+ assert self.set_bool_param('enable_depth', False)
+ profiles = test_profiles[params['camera_name']]['color_test_profiles']
+ for profile in profiles:
+ print("Testing profile: ", profile)
+ themes[0]['expected_fps_in_hz'] = float(profile.split('x')[2])
+ assert self.set_string_param('rgb_camera.profile', profile)
+ assert self.set_bool_param('enable_color', True)
+ ret = self.run_test(themes, timeout=25.0)
+ assert ret[0], ret[1]
+ assert self.process_data(themes)
+
+ finally:
+ #this step is important because the test will fail next time
+ pytest_rs_utils.kill_realsense2_camera_node()
+ self.shutdown()
+
+
diff --git a/realsense2_camera/test/live_camera/test_camera_imu_tests.py b/realsense2_camera/test/live_camera/test_camera_imu_tests.py
new file mode 100644
index 0000000000..384d249f47
--- /dev/null
+++ b/realsense2_camera/test/live_camera/test_camera_imu_tests.py
@@ -0,0 +1,120 @@
+# Copyright 2023 Intel Corporation. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import sys
+import itertools
+
+
+import pytest
+import rclpy
+
+from sensor_msgs.msg import Image as msg_Image
+from sensor_msgs.msg import Imu as msg_Imu
+from sensor_msgs.msg import PointCloud2 as msg_PointCloud2
+
+import numpy as np
+
+sys.path.append(os.path.abspath(os.path.dirname(__file__)+"/../utils"))
+import pytest_rs_utils
+from pytest_rs_utils import launch_descr_with_parameters
+from pytest_rs_utils import delayed_launch_descr_with_parameters
+from pytest_rs_utils import get_rosbag_file_path
+from pytest_rs_utils import get_node_heirarchy
+
+import pytest_live_camera_utils
+from rclpy.parameter import Parameter
+from rcl_interfaces.msg import ParameterValue
+from rcl_interfaces.srv import SetParameters, GetParameters, ListParameters
+from pytest_live_camera_utils import debug_print
+
+test_params_all_profiles_d455 = {
+ 'camera_name': 'D455',
+ 'device_type': 'D455',
+ 'enable_accel':"True",
+ 'enable_gyro':"True",
+ 'unite_imu_method':1,
+ }
+
+@pytest.mark.parametrize("launch_descr_with_parameters", [
+ pytest.param(test_params_all_profiles_d455, marks=pytest.mark.d455)
+ ],indirect=True)
+@pytest.mark.launch(fixture=launch_descr_with_parameters)
+class TestLiveCamera_TestMotionSensor(pytest_rs_utils.RsTestBaseClass):
+ def test_LiveCamera_check_motion_sensor(self,launch_descr_with_parameters):
+ params = launch_descr_with_parameters[1]
+ if pytest_live_camera_utils.check_if_camera_connected(params['device_type']) == False:
+ print("Device not found? : " + params['device_type'])
+ return
+ themes = [{'topic':get_node_heirarchy(params)+'/imu', 'msg_type':msg_Imu,'expected_data_chunks':1},
+ {'topic':get_node_heirarchy(params)+'/gyro/sample', 'msg_type':msg_Imu,'expected_data_chunks':1},
+ {'topic':get_node_heirarchy(params)+'/accel/sample', 'msg_type':msg_Imu,'expected_data_chunks':1}]
+ IMU_TOPIC = 0
+ GYRO_TOPIC = 1
+ ACCEL_TOPIC = 2
+ if params['unite_imu_method'] == '0':
+ themes[IMU_TOPIC]['expected_data_chunks'] = 0
+ try:
+ #initialize
+ self.init_test("RsTest"+params['camera_name'])
+ self.create_param_ifs(get_node_heirarchy(params))
+
+
+ #run with default params and check the data
+ msg = "Test with the default params "
+ print(msg)
+ ret = self.run_test(themes, timeout=50)
+ assert ret[0], msg + str(ret[1])
+ assert self.process_data(themes), msg + " failed"
+
+ msg = "Test with the accel false "
+ self.set_integer_param('unite_imu_method', 0)
+ self.set_bool_param('enable_accel', False)
+ self.set_bool_param('enable_gyro', True)
+ themes[ACCEL_TOPIC]['expected_data_chunks'] = 0
+ themes[IMU_TOPIC]['expected_data_chunks'] = 0
+ print(msg)
+ ret = self.run_test(themes)
+ assert ret[0], msg + str(ret[1])
+ assert self.process_data(themes), msg + " failed"
+
+ msg = "Test with the gyro false "
+ self.set_bool_param('enable_accel', True)
+ self.set_bool_param('enable_gyro', False)
+ themes[IMU_TOPIC]['expected_data_chunks'] = 0
+ themes[ACCEL_TOPIC]['expected_data_chunks'] = 1
+ themes[GYRO_TOPIC]['expected_data_chunks'] = 0
+ print(msg)
+ self.spin_for_time(wait_time=1.0)
+ ret = self.run_test(themes)
+ assert ret[0], msg + str(ret[1])
+ assert self.process_data(themes), msg + " failed"
+
+ msg = "Test with both gyro and accel false "
+ self.set_bool_param('enable_accel', False)
+ self.set_bool_param('enable_gyro', False)
+ self.set_integer_param('unite_imu_method', 1)
+ self.spin_for_time(wait_time=1.0)
+ themes[ACCEL_TOPIC]['expected_data_chunks'] = 0
+ themes[GYRO_TOPIC]['expected_data_chunks'] = 0
+ themes[IMU_TOPIC]['expected_data_chunks'] = 0
+ print(msg)
+ ret = self.run_test(themes, initial_wait_time=1.0)
+ assert ret[0], msg + " failed"
+ assert self.process_data(themes), msg +" failed"
+ finally:
+ #this step is important because the test will fail next time
+ pytest_rs_utils.kill_realsense2_camera_node()
+ self.shutdown()
diff --git a/realsense2_camera/test/live_camera/test_camera_point_cloud_tests.py b/realsense2_camera/test/live_camera/test_camera_point_cloud_tests.py
new file mode 100644
index 0000000000..14def86ca9
--- /dev/null
+++ b/realsense2_camera/test/live_camera/test_camera_point_cloud_tests.py
@@ -0,0 +1,114 @@
+# Copyright 2023 Intel Corporation. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import sys
+import itertools
+
+
+import pytest
+import rclpy
+
+from rclpy.qos import DurabilityPolicy
+from rclpy.qos import HistoryPolicy
+from rclpy.qos import QoSProfile
+import tf2_ros
+
+
+from sensor_msgs.msg import Image as msg_Image
+from sensor_msgs.msg import Imu as msg_Imu
+from sensor_msgs.msg import PointCloud2 as msg_PointCloud2
+from tf2_msgs.msg import TFMessage as msg_TFMessage
+
+import numpy as np
+
+sys.path.append(os.path.abspath(os.path.dirname(__file__)+"/../utils"))
+import pytest_rs_utils
+from pytest_rs_utils import launch_descr_with_parameters
+
+import pytest_live_camera_utils
+from pytest_rs_utils import get_rosbag_file_path
+from pytest_rs_utils import get_node_heirarchy
+
+from rclpy.parameter import Parameter
+from rcl_interfaces.msg import ParameterValue
+from rcl_interfaces.srv import SetParameters, GetParameters, ListParameters
+
+'''
+The test was implemented to check the tf_static before and after infra1 was enabled. There shouldn't be any
+related data before enabling.
+'''
+test_params_points_cloud_d455 = {
+ 'camera_name': 'D455',
+ 'device_type': 'D455',
+ 'pointcloud.enable': 'true'
+ }
+test_params_points_cloud_d435 = {
+ 'camera_name': 'D435',
+ 'device_type': 'D435',
+ 'pointcloud.enable': 'true'
+ }
+
+test_params_points_cloud_d415 = {
+ 'camera_name': 'D415',
+ 'device_type': 'D415',
+ 'enable_color':'true',
+ 'enable_depth':'true',
+ 'depth_module.profile':'848x480x30',
+ 'pointcloud.enable': 'true'
+ }
+
+'''
+This test was ported from rs2_test.py
+the command used to run is "python3 realsense2_camera/scripts/rs2_test.py points_cloud_1"
+'''
+@pytest.mark.parametrize("launch_descr_with_parameters", [
+ pytest.param(test_params_points_cloud_d455, marks=pytest.mark.d455),
+ pytest.param(test_params_points_cloud_d435, marks=pytest.mark.d435),
+ pytest.param(test_params_points_cloud_d415, marks=pytest.mark.d415),
+ ],indirect=True)
+@pytest.mark.launch(fixture=launch_descr_with_parameters)
+class TestCamera_TestPointCloud(pytest_rs_utils.RsTestBaseClass):
+ def test_camera_test_point_cloud(self,launch_descr_with_parameters):
+ self.params = launch_descr_with_parameters[1]
+ if pytest_live_camera_utils.check_if_camera_connected(self.params['device_type']) == False:
+ print("Device not found? : " + self.params['device_type'])
+ return
+ themes = [
+ {
+ 'topic':get_node_heirarchy(self.params)+'/depth/color/points',
+ 'msg_type':msg_PointCloud2,
+ 'expected_data_chunks':5,
+ },
+ ]
+ try:
+ '''
+ initialize, run and check the data
+ '''
+ self.init_test("RsTest"+self.params['camera_name'])
+ self.wait_for_node(self.params['camera_name'])
+ self.create_param_ifs(get_node_heirarchy(self.params))
+ ret = self.run_test(themes, timeout=10)
+ assert ret[0], ret[1]
+ ret = self.process_data(themes, False)
+ assert ret[0], ret[1]
+ finally:
+ self.shutdown()
+ def process_data(self, themes, enable_infra1):
+ for count in range(self.node.get_num_chunks(get_node_heirarchy(self.params)+'/depth/color/points')):
+ data = self.node.pop_first_chunk(get_node_heirarchy(self.params)+'/depth/color/points')
+ print(data)#the frame counter starts with zero, jumps to 2 and continues. To be checked
+ return True,""
+
diff --git a/realsense2_camera/test/live_camera/test_camera_tf_tests.py b/realsense2_camera/test/live_camera/test_camera_tf_tests.py
new file mode 100644
index 0000000000..a31f74e77c
--- /dev/null
+++ b/realsense2_camera/test/live_camera/test_camera_tf_tests.py
@@ -0,0 +1,219 @@
+# Copyright 2023 Intel Corporation. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import sys
+import itertools
+
+
+import pytest
+import rclpy
+
+from rclpy.qos import DurabilityPolicy
+from rclpy.qos import HistoryPolicy
+from rclpy.qos import QoSProfile
+import tf2_ros
+
+
+from sensor_msgs.msg import Image as msg_Image
+from sensor_msgs.msg import Imu as msg_Imu
+from sensor_msgs.msg import PointCloud2 as msg_PointCloud2
+from tf2_msgs.msg import TFMessage as msg_TFMessage
+
+import numpy as np
+
+sys.path.append(os.path.abspath(os.path.dirname(__file__)+"/../utils"))
+import pytest_rs_utils
+from pytest_rs_utils import launch_descr_with_parameters
+
+import pytest_live_camera_utils
+from pytest_rs_utils import get_rosbag_file_path
+from pytest_rs_utils import get_node_heirarchy
+
+from rclpy.parameter import Parameter
+from rcl_interfaces.msg import ParameterValue
+from rcl_interfaces.srv import SetParameters, GetParameters, ListParameters
+
+'''
+The test was implemented to check the tf_static before and after infra1 was enabled. There shouldn't be any
+related data before enabling.
+'''
+test_params_tf_static_change_d455 = {
+ 'camera_name': 'D455',
+ 'device_type': 'D455',
+ 'enable_infra1': 'false',
+ 'enable_infra2': 'true',
+ 'enable_accel': 'true',
+ 'enable_gyro': 'true',
+ }
+test_params_tf_static_change_d435 = {
+ 'camera_name': 'D435',
+ 'device_type': 'D435',
+ 'enable_infra1': 'false',
+ 'enable_infra2': 'true',
+ 'enable_accel': 'true',
+ 'enable_gyro': 'true',
+ }
+
+test_params_tf_static_change_d415 = {
+ 'camera_name': 'D415',
+ 'device_type': 'D415',
+ 'enable_infra1': 'false',
+ 'enable_infra2': 'true',
+ 'enable_accel': 'true',
+ 'enable_gyro': 'true',
+ }
+@pytest.mark.parametrize("launch_descr_with_parameters", [
+ pytest.param(test_params_tf_static_change_d455, marks=pytest.mark.d455),
+ pytest.param(test_params_tf_static_change_d435, marks=pytest.mark.d435),
+ pytest.param(test_params_tf_static_change_d415, marks=pytest.mark.d415),
+ ],indirect=True)
+@pytest.mark.launch(fixture=launch_descr_with_parameters)
+class TestCamera_TestTF_Static_change(pytest_rs_utils.RsTestBaseClass):
+ def test_camera_test_tf_static_change(self,launch_descr_with_parameters):
+ self.params = launch_descr_with_parameters[1]
+ if pytest_live_camera_utils.check_if_camera_connected(self.params['device_type']) == False:
+ print("Device not found? : " + self.params['device_type'])
+ return
+ themes = [
+ {'topic':'/tf_static',
+ 'msg_type':msg_TFMessage,
+ 'expected_data_chunks':1,
+ 'qos' : QoSProfile(depth=100,durability=DurabilityPolicy.TRANSIENT_LOCAL,history=HistoryPolicy.KEEP_LAST,)#static
+ }
+ ]
+ try:
+ '''
+ initialize, run and check the data
+ '''
+ self.init_test("RsTest"+self.params['camera_name'])
+ self.wait_for_node(self.params['camera_name'])
+ self.create_param_ifs(get_node_heirarchy(self.params))
+ ret = self.run_test(themes, timeout=10)
+ assert ret[0], ret[1]
+
+ ret = self.process_data(themes, False)
+ assert ret[0], ret[1]
+ assert self.set_bool_param('enable_infra1', True)
+
+ ret = self.run_test(themes, timeout=10)
+ assert ret[0], ret[1]
+ ret = self.process_data(themes, True)
+ assert ret[0], ret[1]
+ finally:
+ self.shutdown()
+ def process_data(self, themes, enable_infra1):
+ frame_ids = [self.params['camera_name']+'_link',
+ self.params['camera_name']+'_depth_frame',
+ self.params['camera_name']+'_infra2_frame',
+ self.params['camera_name']+'_color_frame']
+ if self.params['device_type'] == 'D455':
+ frame_ids.append(self.params['camera_name']+'_gyro_frame')
+ frame_ids.append(self.params['camera_name']+'_accel_frame')
+ frame_ids.append(self.params['camera_name']+'_infra1_frame')
+ data = self.node.pop_first_chunk('/tf_static')
+ coupled_frame_ids = [xx for xx in itertools.combinations(frame_ids, 2)]
+ res = self.get_transform_data(data, coupled_frame_ids, is_static=True)
+ for couple in coupled_frame_ids:
+ if self.params['camera_name']+'_infra1_frame' in couple:
+ if enable_infra1 == True and res[couple] != None:
+ continue
+ if enable_infra1 == False and res[couple] == None:
+ continue
+ return False, str(couple) + ": tf_data not as expected"
+ if res[couple] == None:
+ return False, str(couple) + ": didn't get any tf data"
+ return True,""
+
+
+test_params_tf_d455 = {
+ 'camera_name': 'D455',
+ 'device_type': 'D455',
+ 'publish_tf': 'true',
+ 'tf_publish_rate': '1.1',
+ }
+
+test_params_tf_d435 = {
+ 'camera_name': 'D435',
+ 'device_type': 'D435',
+ 'publish_tf': 'true',
+ 'tf_publish_rate': '1.1',
+ }
+
+test_params_tf_d415 = {
+ 'camera_name': 'D415',
+ 'device_type': 'D415',
+ 'publish_tf': 'true',
+ 'tf_publish_rate': '1.1',
+ }
+@pytest.mark.parametrize("launch_descr_with_parameters", [
+ pytest.param(test_params_tf_d455, marks=pytest.mark.d455),
+ pytest.param(test_params_tf_d435, marks=pytest.mark.d435),
+ pytest.param(test_params_tf_d415, marks=pytest.mark.d415),
+ ],indirect=True)
+@pytest.mark.launch(fixture=launch_descr_with_parameters)
+class TestCamera_TestTF_DYN(pytest_rs_utils.RsTestBaseClass):
+ def test_camera_test_tf_dyn(self,launch_descr_with_parameters):
+ self.params = launch_descr_with_parameters[1]
+ if pytest_live_camera_utils.check_if_camera_connected(self.params['device_type']) == False:
+ print("Device not found? : " + self.params['device_type'])
+ return
+ themes = [
+ {'topic':'/tf',
+ 'msg_type':msg_TFMessage,
+ 'expected_data_chunks':3,
+ 'qos' : QoSProfile(depth=100,durability=DurabilityPolicy.VOLATILE,history=HistoryPolicy.KEEP_LAST,)
+ #'qos' : QoSProfile(depth=100,durability=DurabilityPolicy.TRANSIENT_LOCAL,history=HistoryPolicy.KEEP_LAST,)#static
+ },
+ {'topic':'/tf_static',
+ 'msg_type':msg_TFMessage,
+ 'expected_data_chunks':1,
+ #'qos' : QoSProfile(depth=100,durability=DurabilityPolicy.VOLATILE,history=HistoryPolicy.KEEP_LAST,)
+ 'qos' : QoSProfile(depth=100,durability=DurabilityPolicy.TRANSIENT_LOCAL,history=HistoryPolicy.KEEP_LAST,)#static
+ }
+ ]
+ try:
+ '''
+ initialize, run and check the data
+ '''
+ self.init_test("RsTest"+self.params['camera_name'])
+ self.wait_for_node(self.params['camera_name'])
+ self.create_param_ifs(get_node_heirarchy(self.params))
+ ret = self.run_test(themes, timeout=10)
+ assert ret[0], ret[1]
+ ret = self.process_data(themes, False)
+ assert ret[0], ret[1]
+ finally:
+ self.shutdown()
+
+ def process_data(self, themes, enable_infra1):
+ frame_ids = [self.params['camera_name']+'_link',
+ self.params['camera_name']+'_depth_frame',
+ self.params['camera_name']+'_color_frame']
+ data = self.node.pop_first_chunk('/tf_static')
+ ret = self.check_transform_data(data, frame_ids, True)
+ assert ret[0], ret[1]
+ data = self.node.pop_first_chunk('/tf')
+ ret = self.check_transform_data(data, frame_ids)
+ assert ret[0], ret[1]
+ data = self.node.pop_first_chunk('/tf')
+ ret = self.check_transform_data(data, frame_ids)
+ assert ret[0], ret[1]
+ data = self.node.pop_first_chunk('/tf')
+ ret = self.check_transform_data(data, frame_ids)
+ assert ret[0], ret[1]
+ #return True, ""
+
+ return True, ""
diff --git a/realsense2_camera/test/live_camera/test_d415_basic_tests.py b/realsense2_camera/test/live_camera/test_d415_basic_tests.py
new file mode 100644
index 0000000000..649f84e7bd
--- /dev/null
+++ b/realsense2_camera/test/live_camera/test_d415_basic_tests.py
@@ -0,0 +1,155 @@
+# Copyright 2023 Intel Corporation. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import sys
+import itertools
+
+
+import pytest
+import rclpy
+
+from sensor_msgs.msg import Image as msg_Image
+from sensor_msgs.msg import Imu as msg_Imu
+from sensor_msgs.msg import PointCloud2 as msg_PointCloud2
+
+import numpy as np
+
+sys.path.append(os.path.abspath(os.path.dirname(__file__)+"/../utils"))
+import pytest_rs_utils
+from pytest_rs_utils import launch_descr_with_parameters
+from pytest_rs_utils import get_rosbag_file_path
+from pytest_rs_utils import get_node_heirarchy
+import pytest_live_camera_utils
+
+from rclpy.parameter import Parameter
+from rcl_interfaces.msg import ParameterValue
+from rcl_interfaces.srv import SetParameters, GetParameters, ListParameters
+
+test_params_depth_avg_1 = {
+ 'camera_name': 'D415',
+ 'device_type': 'D415',
+ }
+'''
+This test was implemented as a template to set the parameters and run the test.
+This directory is not added to the CMakeLists.txt so as to avoid the colcon failure in the
+machines that don't have the D415 connected.
+1. Only a subset of parameter types are implemented in py_rs_utils, it has to be extended for others
+2. After setting the param, rclpy.spin_once may be needed.Test passes even without this though.
+'''
+@pytest.mark.d415
+@pytest.mark.parametrize("launch_descr_with_parameters", [test_params_depth_avg_1],indirect=True)
+@pytest.mark.launch(fixture=launch_descr_with_parameters)
+class TestD415_Change_Resolution(pytest_rs_utils.RsTestBaseClass):
+ def test_D415_Change_Resolution(self,launch_descr_with_parameters):
+ params = launch_descr_with_parameters[1]
+ if pytest_live_camera_utils.check_if_camera_connected(params['device_type']) == False:
+ print("Device not found? : " + params['device_type'])
+ return
+ failed_tests = []
+ num_passed = 0
+
+ num_failed = 0
+ themes = [{'topic':get_node_heirarchy(params)+'/color/image_raw', 'msg_type':msg_Image,'expected_data_chunks':1}]
+ config = pytest_live_camera_utils.get_profile_config(get_node_heirarchy(params))
+ config["Color"]["default_profile1"] = "640x480x6"
+ config["Color"]["default_profile2"] = "1280x720x6"
+ config["Depth"]["default_profile1"] = "640x480x6"
+ config["Depth"]["default_profile2"] = "1280x720x6"
+ config["Infrared"]["default_profile1"] = "640x480x6"
+ config["Infrared"]["default_profile2"] = "1280x720x6"
+ config["Infrared1"]["default_profile1"] = "640x480x6"
+ config["Infrared1"]["default_profile2"] = "1280x720x6"
+ config["Infrared2"]["default_profile1"] = "640x480x6"
+ config["Infrared2"]["default_profile2"] = "1280x720x6"
+
+ cap = [
+ #['Infrared1', '1920x1080x25', 'Y8'],
+ #['Infrared1', '1920x1080x15', 'Y16'],
+ ['Infrared', '848x100x100', 'BGRA8'],
+ ['Infrared', '848x480x60', 'RGBA8'],
+ ['Infrared', '640x480x60', 'RGBA8'],
+ ['Infrared', '640x480x60', 'BGR8'],
+ ['Infrared', '640x360x60', 'BGRA8'],
+ ['Infrared', '640x360x60', 'BGR8'],
+ ['Infrared', '640x360x30', 'UYVY'],
+ ['Infrared', '480x270x60', 'BGRA8'],
+ ['Infrared', '480x270x60', 'RGB8'],
+ ['Infrared', '424x240x60', 'BGRA8'],
+
+ ]
+ try:
+ '''
+ initialize, run and check the data
+ '''
+ self.init_test("RsTest"+params['camera_name'])
+ self.spin_for_time(wait_time=1.0)
+ self.create_param_ifs(get_node_heirarchy(params))
+
+ for key in cap:
+ profile_type = key[0]
+ profile = key[1]
+ format = key[2]
+ print("Testing " + " ".join(key))
+ themes[0]['topic'] = config[profile_type]['topic']
+ themes[0]['width'] = int(profile.split('x')[0])
+ themes[0]['height'] = int(profile.split('x')[1])
+ #'''
+ if themes[0]['width'] == int(config[profile_type]["default_profile2"].split('x')[0]):
+ self.set_string_param(config[profile_type]["profile"], config[profile_type]["default_profile1"])
+ else:
+ self.set_string_param(config[profile_type]["profile"], config[profile_type]["default_profile2"])
+ self.set_bool_param(config[profile_type]["param"], True)
+ self.disable_all_params()
+ #self.set_string_param("depth_profile", "640x480x6")
+ #self.set_bool_param("enable_depth", True)
+ #'''
+ self.spin_for_time(wait_time=0.2)
+ self.set_string_param(config[profile_type]["profile"], profile)
+ self.set_string_param(config[profile_type]["format"], format)
+ self.set_bool_param(config[profile_type]["param"], True)
+ try:
+ ret = self.run_test(themes, timeout=5.0)
+ assert ret[0], ret[1]
+ assert self.process_data(themes), " ".join(key) + " failed"
+ num_passed += 1
+ except Exception as e:
+ exc_type, exc_obj, exc_tb = sys.exc_info()
+ fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
+ print("Test failed")
+ print(e)
+ print(exc_type, fname, exc_tb.tb_lineno)
+ num_failed += 1
+ failed_tests.append(" ".join(key))
+ finally:
+ #this step is important because the test will fail next time
+ pytest_rs_utils.kill_realsense2_camera_node()
+ self.shutdown()
+ if num_failed != 0:
+ print("Failed tests:")
+ print("\n".join(failed_tests))
+ assert False, " Tests failed"
+
+
+ def disable_all_params(self):
+ '''
+ self.set_bool_param('enable_color', False)
+ self.set_bool_param('enable_depth', False)
+ self.set_bool_param('enable_infra', False)
+ self.set_bool_param('enable_infra1', False)
+ self.set_bool_param('enable_infra2', False)
+ '''
+ pass
+
diff --git a/realsense2_camera/test/live_camera/test_d455_basic_tests.py b/realsense2_camera/test/live_camera/test_d455_basic_tests.py
new file mode 100644
index 0000000000..7848ff4986
--- /dev/null
+++ b/realsense2_camera/test/live_camera/test_d455_basic_tests.py
@@ -0,0 +1,99 @@
+# Copyright 2023 Intel Corporation. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import sys
+import itertools
+
+
+import pytest
+import rclpy
+
+from sensor_msgs.msg import Image as msg_Image
+from sensor_msgs.msg import Imu as msg_Imu
+from sensor_msgs.msg import PointCloud2 as msg_PointCloud2
+
+import numpy as np
+
+sys.path.append(os.path.abspath(os.path.dirname(__file__)+"/../utils"))
+import pytest_rs_utils
+import pytest_live_camera_utils
+from pytest_rs_utils import launch_descr_with_parameters
+
+from pytest_rs_utils import get_rosbag_file_path
+from pytest_rs_utils import get_node_heirarchy
+
+from rclpy.parameter import Parameter
+from rcl_interfaces.msg import ParameterValue
+from rcl_interfaces.srv import SetParameters, GetParameters, ListParameters
+
+test_params_depth_avg_1 = {
+ 'camera_name': 'D455',
+ 'device_type': 'D455',
+ }
+'''
+This test was implemented as a template to set the parameters and run the test.
+This directory is not added to the CMakeLists.txt so as to avoid the colcon failure in the
+machines that don't have the D455 connected.
+1. Only a subset of parameter types are implemented in py_rs_utils, it has to be extended for others
+2. After setting the param, rclpy.spin_once may be needed.Test passes even without this though.
+'''
+@pytest.mark.d455
+@pytest.mark.parametrize("launch_descr_with_parameters", [test_params_depth_avg_1],indirect=True)
+@pytest.mark.launch(fixture=launch_descr_with_parameters)
+class TestD455_Change_Resolution(pytest_rs_utils.RsTestBaseClass):
+ def test_D455_Change_Resolution(self,launch_descr_with_parameters):
+ params = launch_descr_with_parameters[1]
+ if pytest_live_camera_utils.check_if_camera_connected(params['device_type']) == False:
+ print("Device not found? : " + params['device_type'])
+ return
+
+ themes = [
+ {'topic':get_node_heirarchy(params)+'/color/image_raw',
+ 'msg_type':msg_Image,
+ 'expected_data_chunks':1,
+ 'width':640,
+ 'height':480,
+ #'data':data
+ }
+ ]
+ try:
+ '''
+ initialize, run and check the data
+ '''
+ print("Starting camera test...")
+ self.init_test("RsTest"+params['camera_name'])
+ self.wait_for_node(params['camera_name'])
+ self.create_param_ifs(get_node_heirarchy(params))
+ #assert self.set_bool_param('enable_color', False)
+ assert self.set_string_param('rgb_camera.profile', '640x480x30')
+ assert self.set_bool_param('enable_color', True)
+ ret = self.run_test(themes)
+ assert ret[0], ret[1]
+ assert self.process_data(themes)
+ self.set_string_param('rgb_camera.profile', '1280x800x5')
+ self.set_bool_param('enable_color', True)
+ themes[0]['width'] = 1280
+ themes[0]['height'] = 800
+
+ ret = self.run_test(themes)
+ assert ret[0], ret[1]
+ assert self.process_data(themes)
+ finally:
+ #this step is important because the test will fail next time
+ pytest_rs_utils.kill_realsense2_camera_node()
+ self.shutdown()
+
+
diff --git a/realsense2_camera/test/rosbag/test_rosbag_all_topics_test.py b/realsense2_camera/test/rosbag/test_rosbag_all_topics_test.py
index 34abbc3039..9cda836f3a 100644
--- a/realsense2_camera/test/rosbag/test_rosbag_all_topics_test.py
+++ b/realsense2_camera/test/rosbag/test_rosbag_all_topics_test.py
@@ -44,21 +44,22 @@
test_params_all_topics = {"rosbag_filename":get_rosbag_file_path("outdoors_1color.bag"),
'camera_name': 'AllTopics',
- 'color_width': '0',
- 'color_height': '0',
- 'depth_width': '0',
- 'depth_height': '0',
- 'infra_width': '0',
- 'infra_height': '0',
'enable_infra1':'true',
'enable_infra2':'true',
- #'align_depth.enable':'true',
+ 'align_depth.enable':'true',
}
'''
To test all topics published
'''
-@pytest.mark.skip
+'''
+To test all topics published
+
+The test doesn't work in CI due to sync. The unlike the live camera, rosbag node publishes the extrinsics only once,
+not every time the subscription is created. The CI due to limited resource can start the ros node much earlier than
+the testcase, hence missing the published data by the test
+'''
@pytest.mark.rosbag
+@pytest.mark.skipif (os.getenv('RS_ROS_REGRESSION', "not found") == "not found",reason="The test doesn't work in CI")
@pytest.mark.parametrize("delayed_launch_descr_with_parameters", [test_params_all_topics],indirect=True)
@pytest.mark.launch(fixture=delayed_launch_descr_with_parameters)
class TestAllTopics(pytest_rs_utils.RsTestBaseClass):
diff --git a/realsense2_camera/test/rosbag/test_rosbag_depth_tests.py b/realsense2_camera/test/rosbag/test_rosbag_depth_tests.py
index f393f42709..51d574da85 100644
--- a/realsense2_camera/test/rosbag/test_rosbag_depth_tests.py
+++ b/realsense2_camera/test/rosbag/test_rosbag_depth_tests.py
@@ -20,10 +20,14 @@
import pytest
import rclpy
+from rclpy.qos import DurabilityPolicy
+from rclpy.qos import HistoryPolicy
+from rclpy.qos import QoSProfile
from sensor_msgs.msg import Image as msg_Image
from sensor_msgs.msg import Imu as msg_Imu
from sensor_msgs.msg import PointCloud2 as msg_PointCloud2
+from tf2_msgs.msg import TFMessage as msg_TFMessage
import numpy as np
@@ -78,12 +82,6 @@ def test_depth_points_cloud_1(self,delayed_launch_descr_with_parameters):
'expected_data_chunks':1,
'data':data1
},
- {'topic':get_node_heirarchy(params)+'/depth/image_rect_raw',
- 'msg_type':msg_Image,
- 'expected_data_chunks':1,
- 'data':data2
- }
-
]
try:
'''
@@ -100,12 +98,10 @@ def process_data(self, themes):
test_params_static_tf_1 = {"rosbag_filename":get_rosbag_file_path("outdoors_1color.bag"),
- 'camera_name': 'Static_tf_1',
+ 'camera_name': 'Static_tf1',
'color_width': '0',
'color_height': '0',
- 'depth_width': '0',
- 'depth_height': '0',
- 'infra_width': '0',
+ "static_tf":True,
'infra_height': '0',
'enable_infra1':'true',
'enable_infra2':'true'
@@ -119,43 +115,45 @@ def process_data(self, themes):
@pytest.mark.launch(fixture=delayed_launch_descr_with_parameters)
class TestStaticTf1(pytest_rs_utils.RsTestBaseClass):
def test_static_tf_1(self,delayed_launch_descr_with_parameters):
- params = delayed_launch_descr_with_parameters[1]
- self.rosbag = params["rosbag_filename"]
- data = {('camera_link', 'camera_color_frame'): ([-0.00010158783697988838, 0.014841210097074509, -0.00022671300393994898], [-0.0008337442995980382, 0.0010442184284329414, -0.0009920650627464056, 0.9999986290931702]),
- ('camera_link', 'camera_depth_frame'): ([0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 1.0]),
- ('camera_link', 'camera_infra1_frame'): ([0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 1.0]),
- ('camera_depth_frame', 'camera_infra1_frame'): ([0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 1.0]),
- ('camera_depth_frame', 'camera_color_frame'): ([-0.00010158783697988838, 0.014841210097074509, -0.00022671300393994898], [-0.0008337442995980382, 0.0010442184284329414, -0.0009920650627464056, 0.9999986290931702]),
- ('camera_infra1_frame', 'camera_color_frame'): ([-0.00010158783697988838, 0.014841210097074509, -0.00022671300393994898], [-0.0008337442995980382, 0.0010442184284329414, -0.0009920650627464056, 0.9999986290931702])}
+ self.params = delayed_launch_descr_with_parameters[1]
+ self.rosbag = self.params["rosbag_filename"]
themes = [
- {'topic':get_node_heirarchy(params)+'/color/image_raw',
+ {'topic':get_node_heirarchy(self.params)+'/color/image_raw',
'msg_type':msg_Image,
'expected_data_chunks':1,
- 'data':data,
+ 'static_tf':True,
+ },
+ {'topic':'/tf_static',
+ 'msg_type':msg_TFMessage,
+ 'expected_data_chunks':1,
+ 'qos' : QoSProfile(depth=100,durability=DurabilityPolicy.TRANSIENT_LOCAL,history=HistoryPolicy.KEEP_LAST,)#static
}
]
try:
'''
initialize, run and check the data
'''
- self.init_test("RsTest"+params['camera_name'])
+ self.init_test("RsTest"+self.params['camera_name'])
ret = self.run_test(themes)
assert ret[0], ret[1]
assert self.process_data(themes)
finally:
self.shutdown()
def process_data(self, themes):
- #print ('Gathering static transforms')
- frame_ids = ['camera_link', 'camera_depth_frame', 'camera_infra1_frame', 'camera_infra2_frame', 'camera_color_frame']
+ expected_data = {(self.params['camera_name']+'_link', self.params['camera_name']+'_color_frame'): ([-0.00010158783697988838, 0.014841210097074509, -0.00022671300393994898], [-0.0008337442995980382, 0.0010442184284329414, -0.0009920650627464056, 0.9999986290931702]),
+ (self.params['camera_name']+'_link', self.params['camera_name']+'_depth_frame'): ([0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 1.0]),
+ (self.params['camera_name']+'_link', self.params['camera_name']+'_infra1_frame'): ([0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 1.0]),
+ (self.params['camera_name']+'_depth_frame', self.params['camera_name']+'_infra1_frame'): ([0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 1.0]),
+ (self.params['camera_name']+'_depth_frame', self.params['camera_name']+'_color_frame'): ([-0.00010158783697988838, 0.014841210097074509, -0.00022671300393994898], [-0.0008337442995980382, 0.0010442184284329414, -0.0009920650627464056, 0.9999986290931702]),
+ (self.params['camera_name']+'_infra1_frame', self.params['camera_name']+'_color_frame'): ([-0.00010158783697988838, 0.014841210097074509, -0.00022671300393994898], [-0.0008337442995980382, 0.0010442184284329414, -0.0009920650627464056, 0.9999986290931702])}
+ frame_ids = [self.params['camera_name']+'_link', self.params['camera_name']+'_depth_frame', self.params['camera_name']+'_infra1_frame', self.params['camera_name']+'_infra2_frame', self.params['camera_name']+'_color_frame', self.params['camera_name']+'_fisheye_frame', self.params['camera_name']+'_pose']
+ coupled_frame_ids = [xx for xx in itertools.combinations(frame_ids, 2)]
+ data = self.node.pop_first_chunk('/tf_static')
coupled_frame_ids = [xx for xx in itertools.combinations(frame_ids, 2)]
- res = {}
- for couple in coupled_frame_ids:
- from_id, to_id = couple
- if (self.node.tfBuffer.can_transform(from_id, to_id, rclpy.time.Time(), rclpy.time.Duration(nanoseconds=3e6))):
- res[couple] = self.node.tfBuffer.lookup_transform(from_id, to_id, rclpy.time.Time(), rclpy.time.Duration(nanoseconds=1e6)).transform
- else:
- res[couple] = None
- return pytest_rs_utils.staticTFTest(res, themes[0]["data"])
+ tfs_data = self.get_transform_data(data, coupled_frame_ids, is_static=True)
+ ret = pytest_rs_utils.staticTFTest(tfs_data, expected_data)
+ assert ret[0], ret[1]
+ return ret[0]
test_params_non_existing_rosbag = {"rosbag_filename":"non_existent.bag",
diff --git a/realsense2_camera/test/utils/pytest_live_camera_utils.py b/realsense2_camera/test/utils/pytest_live_camera_utils.py
new file mode 100644
index 0000000000..9b5bfeac70
--- /dev/null
+++ b/realsense2_camera/test/utils/pytest_live_camera_utils.py
@@ -0,0 +1,207 @@
+# Copyright 2023 Intel Corporation. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+import sys
+import time
+import ctypes
+import struct
+import requests
+import json
+
+from pytest_rs_utils import debug_print
+
+
+def get_profile_config(camera_name):
+ config = {
+ "Color":{"profile":"rgb_camera.profile", "format":'rgb_camera.color_format', "param":"enable_color", "topic":camera_name+'/color/image_raw',},
+ "Depth":{"profile":"depth_module.profile", "format":'depth_module.depth_format', "param":"enable_depth", 'topic':camera_name+'/depth/image_rect_raw'},
+ "Infrared":{"profile":"depth_module.profile", "format":'depth_module.infra_format', "param":"enable_infra", 'topic':camera_name+'/infra/image_rect_raw'},
+ "Infrared1":{"profile":"depth_module.profile", "format":'depth_module.infra1_format',"param":"enable_infra1", 'topic':camera_name+'/infra/image_rect_raw'},
+ "Infrared2":{"profile":"depth_module.profile", "format":'depth_module.infra2_format',"param":"enable_infra2", 'topic':camera_name+'/infra/image_rect_raw'},
+ }
+ return config
+
+
+def get_default_profiles(cap, profile):
+ profile1 = ""
+ profile2 = ""
+ for profiles in cap:
+ if profiles[0] == profile and int(profiles[1].split('x')[0]) != 640:
+ profile1 = profiles[1]
+ break
+ for profiles in cap:
+ if profiles[0] == profile and int(profiles[1].split('x')[0]) != int(profile1.split('x')[0]):
+ profile2 = profiles[1]
+ break
+ debug_print(profile + " default profile1:" + profile1)
+ debug_print(profile + " default profile2:" + profile2)
+ return profile1,profile2
+
+
+
+device_info_string = "Device info:"
+
+def get_device_info_location(long_data, index=0):
+ for line_no in range(index, len(long_data)):
+ if device_info_string in long_data[line_no]:
+ return line_no
+ return len(long_data)
+
+stream_profile_string = "Stream Profiles supported by"
+def get_stream_profile_location(long_data, start_index, end_index, profile_string):
+ for line_no in range(start_index, end_index):
+ if stream_profile_string in long_data[line_no]:
+ if profile_string in long_data[line_no]:
+ return line_no
+ return None
+
+def get_depth_profiles(long_data, start_index, end_index):
+ cap = []
+ for line_no in range(start_index, end_index):
+ if len(long_data[line_no]) == 0:
+ break
+ debug_print("depth profile processing:" + long_data[line_no])
+ depth_profile = long_data[line_no].split()
+ if len(depth_profile) == 5:
+ profile = depth_profile[0]
+ value = depth_profile[1]+"x"+depth_profile[3]
+ format = depth_profile[4]
+ elif len(depth_profile) == 6:
+ profile = depth_profile[0]+depth_profile[1]
+ value = depth_profile[2]+"x"+depth_profile[4]
+ format = depth_profile[5]
+ else:
+ assert false, "Seems that the depth profile info format printed by rs-enumerate-devices changed"
+ value = value[:-2]
+ debug_print("depth profile added: " + profile, value, format)
+ cap.append([profile, value, format])
+ debug_print(cap)
+ return cap
+
+
+def get_color_profiles(long_data, start_index, end_index):
+ cap = []
+ for line_no in range(start_index, end_index):
+ if len(long_data[line_no]) == 0:
+ break
+ debug_print("color profile processing:" + long_data[line_no])
+ color_profile = long_data[line_no].split()
+ if len(color_profile) == 5:
+ profile = color_profile[0]
+ value = color_profile[1]+"x"+color_profile[3]
+ format = color_profile[4]
+ else:
+ assert false, "Seems that the color profile info format printed by rs-enumerate-devices changed"
+ value = value[:-2]
+ debug_print("color profile added: " + profile, value, format)
+ cap.append([profile, value, format])
+ debug_print(cap)
+ return cap
+
+NAME_LINE_INDEX = 1
+NAME_LINE_NAME_OFFSET = 4
+SERIAL_NO_LINE_INDEX = 2
+SERIAL_NO_VALUE_OFFSET = 3
+def parse_device_info(long_data, start_index, end_index, device_type, serial_no):
+ #after device_info, the next line should have the name and device type
+ capability = {}
+ debug_print("Searching for data between lines ", str(start_index) + " and " + str(end_index))
+ name_line = long_data[start_index+NAME_LINE_INDEX].split()
+ if name_line[0] != "Name":
+ assert False, "rs-enumerate-devices output format changed"
+ if name_line[4] != device_type:
+ debug_print("device not matching:" + name_line[NAME_LINE_NAME_OFFSET])
+ return None
+ debug_print("device matched:" + name_line[NAME_LINE_NAME_OFFSET])
+ if serial_no != None:
+ #next line after nameline should have the serial_no
+ serial_no_line = long_data[start_index+SERIAL_NO_LINE_INDEX].split()
+ if serial_no_line[0] != "Serial":
+ assert False, "rs-enumerate-devices output format changed"
+ if serial_no_line[SERIAL_NO_VALUE_OFFSET] != serial_no:
+ debug_print("serial_no not matching:" + serial_no_line[SERIAL_NO_VALUE_OFFSET])
+ return None
+ debug_print("serial_no matched:" + serial_no_line[SERIAL_NO_VALUE_OFFSET])
+ else:
+ serial_no = long_data[start_index+SERIAL_NO_LINE_INDEX].split()[SERIAL_NO_VALUE_OFFSET]
+
+ capability["device_type"] = device_type
+ capability["serial_no"] = serial_no
+ depth_profile_index = get_stream_profile_location(long_data, start_index, end_index, "Stereo Module")
+ if depth_profile_index != None:
+ capability["depth_profile"] = get_depth_profiles(long_data, depth_profile_index+3, end_index)
+ rgb_profile_index = get_stream_profile_location(long_data, start_index, end_index, "RGB Camera")
+ if rgb_profile_index != None:
+ capability["color_profile"] = get_color_profiles(long_data, rgb_profile_index+3, end_index)
+ return capability
+
+def get_camera_capabilities(device_type, serial_no=None):
+ long_data = os.popen("rs-enumerate-devices").read().splitlines()
+ debug_print(serial_no)
+ index = 0
+ while index < len(long_data):
+ index = get_device_info_location(long_data, index)
+ if index == len(long_data):
+ return
+ else:
+ debug_print("DeviceInfo found at: " + str(index))
+ start_index = index
+ index += 1
+ index = get_device_info_location(long_data, index)
+ capability = parse_device_info(long_data, start_index, index, device_type, serial_no)
+ if capability != None:
+ return capability
+ return None
+
+def get_camera_capabilities_short(device_type, serial_no=None):
+ short_data = os.popen("rs-enumerate-devices -s").read().splitlines()
+ print(serial_no)
+ for line in short_data:
+ print(line)
+ if device_type in line:
+ if serial_no is None or serial_no == "" :
+ print(device_type+ " found in " + line)
+ return
+ if serial_no in line:
+ print(device_type + " with serial_no " + serial_no +" found in " + line)
+ return
+ print(device_type + " not found")
+
+def check_if_camera_connected(device_type, serial_no=None):
+ long_data = os.popen("rs-enumerate-devices -s").read().splitlines()
+ debug_print(serial_no)
+ index = 0
+ for index in range(len(long_data)):
+ name_line = long_data[index].split()
+ if name_line[0] != "Intel":
+ continue
+ if name_line[2] != device_type:
+ continue
+ if serial_no == None:
+ return True
+ if serial_no == name_line[3]:
+ return True
+
+ return False
+
+if __name__ == '__main__':
+ device_type = 'D455'
+ serial_no = None
+ if len(sys.argv) > 1:
+ device_type = sys.argv[1]
+ if len(sys.argv) > 2:
+ serial_no = sys.argv[2]
+ cap = get_camera_capabilities(device_type, serial_no)
+ print("Capabilities:")
+ print(cap)
\ No newline at end of file
diff --git a/realsense2_camera/test/utils/pytest_rs_utils.py b/realsense2_camera/test/utils/pytest_rs_utils.py
index bdd1bdba1a..db9c09c3c0 100644
--- a/realsense2_camera/test/utils/pytest_rs_utils.py
+++ b/realsense2_camera/test/utils/pytest_rs_utils.py
@@ -15,8 +15,13 @@
import sys
import time
from collections import deque
+import functools
+import itertools
+import subprocess
+
import pytest
+
import numpy as np
from launch import LaunchDescription
@@ -28,10 +33,31 @@
from rclpy import qos
from rclpy.node import Node
from rclpy.utilities import ok
+from ros2topic.verb.bw import ROSTopicBandwidth
+from ros2topic.verb.hz import ROSTopicHz
+from ros2topic.api import get_msg_class
import ctypes
import struct
import requests
+import math
+
+#from rclpy.parameter import Parameter
+from rcl_interfaces.msg import Parameter
+from rcl_interfaces.msg import ParameterValue
+from rcl_interfaces.srv import SetParameters, GetParameters, ListParameters
+'''
+humble doesn't have the SetParametersResult and SetParameters_Response imported using
+__init__.py. The below two lines can be used for iron and hopefully succeeding ROS2 versions
+'''
+#from rcl_interfaces.msg import SetParametersResult
+#from rcl_interfaces.srv import SetParameters_Response
+from rcl_interfaces.msg._set_parameters_result import SetParametersResult
+from rcl_interfaces.srv._set_parameters import SetParameters_Response
+
+from rcl_interfaces.msg import ParameterType
+from rcl_interfaces.msg import ParameterValue
+
from sensor_msgs.msg import Image as msg_Image
@@ -54,6 +80,11 @@
import tempfile
import os
import requests
+
+def debug_print(*args):
+ if(False):
+ print(*args)
+
class RosbagManager(object):
def __new__(cls):
if not hasattr(cls, 'instance'):
@@ -82,12 +113,13 @@ def init(self):
def get_rosbag_path(self, filename):
if filename in self.rosbag_files:
return self.rosbag_location + "/" + filename
-rosbagMgr = RosbagManager()
+
def get_rosbag_file_path(filename):
+ rosbagMgr = RosbagManager()
path = rosbagMgr.get_rosbag_path(filename)
assert path, "No rosbag file found :"+filename
return path
-
+get_rosbag_file_path.rosbagMgr = None
def CameraInfoGetData(rec_filename, topic):
data = importRosbag(rec_filename, importTopics=[topic], log='ERROR', disable_bar=True)[topic]
@@ -412,6 +444,17 @@ def kill_realsense2_camera_node():
os.system(cmd)
pass
+'''
+function gets all the topics for a camera node
+'''
+
+def get_all_topics(camera_name=None):
+ cmd = 'ros2 topic list'
+ if camera_name!=None:
+ cmd += '| grep ' + camera_name
+ direct_output = os.popen(cmd).read()
+ return direct_output
+
'''
get the default parameters from the launch script so that the test doesn't have to
get updated for each change to the parameter or default values
@@ -480,7 +523,7 @@ def get_rs_node_description(params):
#name=LaunchConfiguration("camera_name"),
namespace=params["camera_namespace"],
name=params["camera_name"],
- #prefix=['xterm -e gdb --args'],
+ #prefix=['xterm -e gdb -ex=run --args'],
executable='realsense2_camera_node',
parameters=[tmp_yaml.name],
output='screen',
@@ -552,6 +595,16 @@ def delayed_launch_descr_with_parameters(request):
actions = [
first_node,], period=period)
]),params
+class HzWrapper(ROSTopicHz):
+ def _callback_hz(self, m, topic=None):
+ if self.get_last_printed_tn(topic=topic) == 0:
+ self.set_last_printed_tn(self.get_msg_tn(topic=topic), topic=topic)
+ return self.callback_hz(m, topic)
+ def restart_topic(self, topic):
+ self._last_printed_tn[topic] = 0
+ self._times[topic].clear()
+ self._msg_tn[topic] = 0
+ self._msg_t0[topic] = -1
'''
This is that holds the test node that listens to a subscription created by a test.
@@ -564,6 +617,7 @@ def __init__(self, name='test_node'):
self.data = {}
self.tfBuffer = None
self.frame_counter = {}
+ self._ros_topic_hz = HzWrapper(super(), 10000, use_wtime=False)
def wait_for_node(self, node_name, timeout=8.0):
start = time.time()
@@ -576,14 +630,23 @@ def wait_for_node(self, node_name, timeout=8.0):
return True, ""
time.sleep(timeout/5)
return False, "Timed out waiting for "+ str(timeout)+ "seconds"
- def create_subscription(self, msg_type, topic , data_type, store_raw_data):
- super().create_subscription(msg_type, topic , self.rsCallback(topic,msg_type, store_raw_data), data_type)
+ def reset_data(self, topic):
self.data[topic] = deque()
self.frame_counter[topic] = 0
- if (self.tfBuffer == None):
+ self._ros_topic_hz.restart_topic(topic)
+
+ def create_subscription(self, msg_type, topic , data_type, store_raw_data, measure_hz):
+ self.reset_data(topic)
+ super().create_subscription(msg_type, topic , self.rsCallback(topic,msg_type, store_raw_data), data_type)
+ #hz measurements are not working
+ if measure_hz == True:
+ msg_class = get_msg_class(super(), topic, blocking=True, include_hidden_topics=True)
+ super().create_subscription(msg_class,topic,functools.partial(self._ros_topic_hz._callback_hz, topic=topic),data_type)
+ self._ros_topic_hz.set_last_printed_tn(0, topic=topic)
+
+ if self.tfBuffer == None:
self.tfBuffer = tf2_ros.Buffer()
self.tf_listener = tf2_ros.TransformListener(self.tfBuffer, super())
-
def get_num_chunks(self,topic):
return len(self.data[topic])
@@ -592,14 +655,14 @@ def pop_first_chunk(self, topic):
def image_msg_to_numpy(self, data):
fmtString = data.encoding
- if fmtString in ['mono8', '8UC1', 'bgr8', 'rgb8', 'bgra8', 'rgba8']:
+ if fmtString in ['mono8', '8UC1', 'bgr8', 'rgb8', 'bgra8', 'rgba8', 'yuv422_yuy2', 'yuv422']:
img = np.frombuffer(data.data, np.uint8)
elif fmtString in ['mono16', '16UC1', '16SC1']:
img = np.frombuffer(data.data, np.uint16)
elif fmtString == '32FC1':
img = np.frombuffer(data.data, np.float32)
else:
- print('image format not supported:' + fmtString)
+ print('py_rs_utils.image_msg_to_numpy:image format not supported:' + fmtString)
return None
depth = data.step / (data.width * img.dtype.itemsize)
@@ -613,9 +676,12 @@ def image_msg_to_numpy(self, data):
The processing of data is taken from the existing testcase in scripts rs2_test
'''
def rsCallback(self, topic, msg_type, store_raw_data):
- print("RSCallback")
+ debug_print("RSCallback")
def _rsCallback(data):
- print('Got the callback for ' + topic)
+ '''
+ enabling prints in callback reduces the fps in some cases
+ '''
+ debug_print('Got the callback for ' + topic)
#print(data.header)
self.flag = True
if store_raw_data == True:
@@ -707,23 +773,78 @@ def init_test(self,name='RsTestNode'):
self.flag = False
self.node = RsTestNode(name)
self.subscribed_topics = []
- def create_subscription(self, msg_type, topic, data_type, store_raw_data=False):
+
+ def wait_for_node(self, node_name, timeout=8.0):
+ self.node.wait_for_node(node_name, timeout)
+ def create_subscription(self, msg_type, topic, data_type, store_raw_data=False, measure_hz=False):
if not topic in self.subscribed_topics:
- self.node.create_subscription(msg_type, topic, data_type, store_raw_data)
+ self.node.create_subscription(msg_type, topic, data_type, store_raw_data, measure_hz)
self.subscribed_topics.append(topic)
+ else:
+ self.node.reset_data(topic)
- def spin_for_data(self,themes):
- start = time.time()
+
+
+ def create_param_ifs(self, camera_name):
+ self.set_param_if = self.node.create_client(SetParameters, camera_name + '/set_parameters')
+ self.get_param_if = self.node.create_client(GetParameters, camera_name + '/get_parameters')
+ while not self.get_param_if.wait_for_service(timeout_sec=1.0):
+ print('service not available, waiting again...')
+ while not self.set_param_if.wait_for_service(timeout_sec=1.0):
+ print('service not available, waiting again...')
+
+ def send_param(self, req):
+ future = self.set_param_if.call_async(req)
+ while rclpy.ok():
+ rclpy.spin_once(self.node)
+ if future.done():
+ try:
+ response = future.result()
+ if response.results[0].successful:
+ return True
+ except Exception as e:
+ print("exception raised:")
+ print(e)
+ pass
+ return False
+
+ def set_string_param(self, param_name, param_value):
+ req = SetParameters.Request()
+ new_param_value = ParameterValue(type=ParameterType.PARAMETER_STRING, string_value=param_value)
+ req.parameters = [Parameter(name=param_name, value=new_param_value)]
+ return self.send_param(req)
+
+ def set_bool_param(self, param_name, param_value):
+ req = SetParameters.Request()
+ new_param_value = ParameterValue(type=ParameterType.PARAMETER_BOOL, bool_value=param_value)
+ req.parameters = [Parameter(name=param_name, value=new_param_value)]
+ return self.send_param(req)
+
+ def set_integer_param(self, param_name, param_value):
+ req = SetParameters.Request()
+ new_param_value = ParameterValue(type=ParameterType.PARAMETER_INTEGER, integer_value=param_value)
+ req.parameters = [Parameter(name=param_name, value=new_param_value)]
+ return self.send_param(req)
+
+ def spin_for_data(self,themes, timeout=5.0):
'''
timeout value varies depending upon the system, it needs to be more if
the access is over the network
'''
- timeout = 25.0
print('Waiting for topic... ' )
flag = False
+ data_not_expected1 = [i for i in themes if (i["expected_data_chunks"]) == 0]
+ if data_not_expected1 == []:
+ data_not_expected = False
+ else:
+ data_not_expected = True
+ start = time.time()
+ msg = ""
while (time.time() - start) < timeout:
rclpy.spin_once(self.node, timeout_sec=1)
- print('Spun once... ' )
+ debug_print('Spun once... ' )
+ if data_not_expected == True:
+ continue
all_found = True
for theme in themes:
if theme['expected_data_chunks'] > int(self.node.get_num_chunks(theme['topic'])):
@@ -733,27 +854,51 @@ def spin_for_data(self,themes):
flag =True
break
else:
- print("Timed out waiting for", timeout, "seconds" )
- return False, "run_test timedout"
- return flag,""
+ if data_not_expected == False:
+ msg = "Timedout: Data expected, but not received for: "
+ for theme in themes:
+ if theme['expected_data_chunks'] > int(self.node.get_num_chunks(theme['topic'])):
+ msg += " " + theme['topic']
+ msg += " Nodes available: " + str(self.node.get_node_names())
+ return False, msg
+ flag = True
+ return flag,msg
def spin_for_time(self,wait_time):
start = time.time()
- print('Waiting for topic... ' )
+ print('Waiting for time... ' )
flag = False
- while time.time() - start < wait_time:
- rclpy.spin_once(self.node)
- print('Spun once... ' )
+ while (time.time() - start) < wait_time:
+ print('Spun for time once... ' )
+ rclpy.spin_once(self.node, timeout_sec=wait_time)
- def run_test(self, themes):
+ def run_test(self, themes, initial_wait_time=0.0, timeout=0):
try:
for theme in themes:
store_raw_data = False
if 'store_raw_data' in theme:
store_raw_data = theme['store_raw_data']
- self.create_subscription(theme['msg_type'], theme['topic'] , qos.qos_profile_sensor_data,store_raw_data)
+ if 'expected_fps_in_hz' in theme:
+ measure_hz = True
+ else:
+ measure_hz = False
+ qos_type = qos.qos_profile_sensor_data
+ if 'qos' in theme:
+ qos_type = theme['qos']
+ self.create_subscription(theme['msg_type'], theme['topic'] , qos_type,store_raw_data, measure_hz)
print('subscription created for ' + theme['topic'])
- self.flag = self.spin_for_data(themes)
+ '''
+ change the default based on whether data is expected or not
+ '''
+ if timeout == 0:
+ timeout = 5.0
+ data_not_expected1 = [i for i in themes if (i["expected_data_chunks"]) == 0]
+ if data_not_expected1 == []:
+ timeout = 50.0 #high value due to resource constraints in CI
+
+ if initial_wait_time != 0.0:
+ self.spin_for_time(initial_wait_time)
+ self.flag = self.spin_for_data(themes, timeout)
except Exception as e:
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
@@ -764,8 +909,31 @@ def run_test(self, themes):
else:
print(e)
self.flag =False,e
-
return self.flag
+
+ def get_transform_data(self, data, coupled_frame_ids, is_static=False):
+ tfBuffer = tf2_ros.Buffer()
+ for transform in data.transforms:
+ if is_static:
+ tfBuffer.set_transform_static(transform, "default_authority")
+ else:
+ tfBuffer.set_transform(transform, "default_authority")
+ res = dict()
+ for couple in coupled_frame_ids:
+ from_id, to_id = couple
+ if (tfBuffer.can_transform(from_id, to_id, rclpy.time.Time(), rclpy.time.Duration(nanoseconds=3e6))):
+ res[couple] = tfBuffer.lookup_transform(from_id, to_id, rclpy.time.Time(), rclpy.time.Duration(nanoseconds=1e6)).transform
+ else:
+ res[couple] = None
+ return res
+ def check_transform_data(self, data, frame_ids, is_static=False):
+ coupled_frame_ids = [xx for xx in itertools.combinations(frame_ids, 2)]
+ res = self.get_transform_data(data, coupled_frame_ids, is_static)
+ for couple in coupled_frame_ids:
+ if res[couple] == None:
+ return False, str(couple) + ": didn't get any tf data"
+ return True,""
+
'''
Please override and use your own process_data if the default check is not suitable.
Please also store_raw_data parameter in the themes as True, if you want the
@@ -773,11 +941,25 @@ def run_test(self, themes):
'''
def process_data(self, themes):
for theme in themes:
+ if theme['expected_data_chunks'] == 0:
+ assert self.node.get_num_chunks(theme['topic']) == 0, "Received data, when not expected for topic:" + theme['topic']
+ continue #no more checks needed if data is not available
+
+ if 'expected_fps_in_hz' in theme:
+ hz = self.node._ros_topic_hz.get_hz(theme['topic'])
+ if hz == None:
+ print("Couldn't measure fps, no of data frames expected are enough for the measurement?")
+ else:
+ speed= 1e9*hz[0]
+ msg = "FPS in Hz of topic " + theme['topic'] + " is " + str(speed) + ". Expected is " + str(theme['expected_fps_in_hz'])
+ print(msg)
+ if (abs(theme['expected_fps_in_hz']-speed) > theme['expected_fps_in_hz']/10):
+ assert False,msg
data = self.node.pop_first_chunk(theme['topic'])
if 'width' in theme:
- assert theme['width'] == data['shape'][0][1] # (get from numpy image the width)
+ assert theme['width'] == data['shape'][0][1], "Width not matched. Expected:" + str(theme['width']) + " & got: " + str(data['shape'][0][1]) # (get from numpy image the width)
if 'height' in theme:
- assert theme['height'] == data['shape'][0][0] # (get from numpy image the height)
+ assert theme['height'] == data['shape'][0][0], "Height not matched. Expected:" + str(theme['height']) + " & got: " + str(data['shape'][0][0]) # (get from numpy image the height)
if 'data' not in theme:
print('No data to compare for ' + theme['topic'])
#print(data)