From a9fcee49d7bb48917e1d27628680fa43b564bef0 Mon Sep 17 00:00:00 2001 From: PrasRsRos Date: Fri, 4 Aug 2023 15:24:36 +0530 Subject: [PATCH] Added live camera test for changing profile based on enumerated data --- .../test/d455/test_d455_all_profile_tests.py | 101 +++++++++++ .../test/d455/test_d455_basic_tests.py | 14 +- .../test/utils/pytest_live_camera_utils.py | 163 ++++++++++++++++++ .../test/utils/pytest_rs_utils.py | 33 +++- 4 files changed, 300 insertions(+), 11 deletions(-) create mode 100644 realsense2_camera/test/d455/test_d455_all_profile_tests.py create mode 100644 realsense2_camera/test/utils/pytest_live_camera_utils.py diff --git a/realsense2_camera/test/d455/test_d455_all_profile_tests.py b/realsense2_camera/test/d455/test_d455_all_profile_tests.py new file mode 100644 index 0000000000..6ce8b8789f --- /dev/null +++ b/realsense2_camera/test/d455/test_d455_all_profile_tests.py @@ -0,0 +1,101 @@ +# Copyright 2023 Intel Corporation. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os +import sys +import itertools + + +import pytest +import rclpy + +from sensor_msgs.msg import Image as msg_Image +from sensor_msgs.msg import Imu as msg_Imu +from sensor_msgs.msg import PointCloud2 as msg_PointCloud2 + +import numpy as np + +sys.path.append(os.path.abspath(os.path.dirname(__file__)+"/../utils")) +import pytest_rs_utils +from pytest_rs_utils import launch_descr_with_parameters +from pytest_rs_utils import delayed_launch_descr_with_parameters +from pytest_rs_utils import get_rosbag_file_path +import pytest_live_camera_utils +from rclpy.parameter import Parameter +from rcl_interfaces.msg import ParameterValue +from rcl_interfaces.srv import SetParameters, GetParameters, ListParameters + +test_params_all_profiles = { + 'camera_name': 'D455', + 'device_type': 'D455', + } +''' +This test was implemented as a template to set the parameters and run the test. +This directory is not added to the CMakeLists.txt so as to avoid the colcon failure in the +machines that don't have the D455 connected. +1. Only a subset of parameter types are implemented in py_rs_utils, it has to be extended for others +2. After setting the param, rclpy.spin_once may be needed.Test passes even without this though. +''' +@pytest.mark.parametrize("launch_descr_with_parameters", [test_params_all_profiles],indirect=True) +@pytest.mark.launch(fixture=launch_descr_with_parameters) +class TestD455_Change_Resolution(pytest_rs_utils.RsTestBaseClass): + def test_D455_Change_Resolution(self,launch_descr_with_parameters): + params = launch_descr_with_parameters[1] + themes = [ + {'topic':'/'+params['camera_name']+'/color/image_raw', + 'msg_type':msg_Image, + 'expected_data_chunks':1, + #'data':data + } + ] + try: + ''' + initialize, run and check the data + ''' + serial_no = None + if 'serial_no' in params: + serial_no = params['serial_no'] + cap = pytest_live_camera_utils.get_camera_capabilities(params['device_type'], serial_no) + self.init_test("RsTest"+params['camera_name']) + self.create_param_ifs(params['camera_name'] + '/' + params['camera_name']) + for profile in cap["color_profile"]: + if profile[0] == 'Color': + print("Testing :" + profile[1] + " " + profile[2]) + if "BGRA8" == profile[2]: + print("Skipping BGRA8" ) + continue + if "RGBA8" == profile[2]: + print("Skipping RGBA8" ) + continue + if "Y8" == profile[2]: + print("Skipping Y8" ) + continue + self.set_string_param('rgb_camera.profile', profile[1]) + self.set_string_param('rgb_camera.color_format', profile[2]) + self.set_bool_param('enable_color', True) + themes[0]['width'] = int(profile[1].split('x')[0]) + themes[0]['height'] = int(profile[1].split('x')[1]) + ret = self.run_test(themes) + assert ret[0], ret[1] + assert self.process_data(themes) + + ret = self.run_test(themes) + assert self.process_data(themes) + finally: + #this step is important because the test will fail next time + pytest_rs_utils.kill_realsense2_camera_node() + self.shutdown() + + diff --git a/realsense2_camera/test/d455/test_d455_basic_tests.py b/realsense2_camera/test/d455/test_d455_basic_tests.py index b2b8d0cf31..2fccb2d29b 100644 --- a/realsense2_camera/test/d455/test_d455_basic_tests.py +++ b/realsense2_camera/test/d455/test_d455_basic_tests.py @@ -30,7 +30,7 @@ sys.path.append(os.path.abspath(os.path.dirname(__file__)+"/../utils")) import pytest_rs_utils from pytest_rs_utils import launch_descr_with_parameters -from pytest_rs_utils import delayed_launch_descr_with_parameters + from pytest_rs_utils import get_rosbag_file_path from rclpy.parameter import Parameter @@ -66,19 +66,23 @@ def test_D455_Change_Resolution(self,launch_descr_with_parameters): ''' initialize, run and check the data ''' + print("Starting camera test...") self.init_test("RsTest"+params['camera_name']) + self.wait_for_node(params['camera_name']) self.create_param_ifs(params['camera_name'] + '/' + params['camera_name']) - self.set_string_param('rgb_camera.profile', '640x480x30') - self.set_bool_param('enable_color', True) + #assert self.set_bool_param('enable_color', False) + assert self.set_string_param('rgb_camera.profile', '640x480x30') + assert self.set_bool_param('enable_color', True) ret = self.run_test(themes) assert ret[0], ret[1] assert self.process_data(themes) - self.set_string_param('rgb_camera.profile', '1280x720x15') + self.set_string_param('rgb_camera.profile', '1280x800x5') self.set_bool_param('enable_color', True) themes[0]['width'] = 1280 - themes[0]['height'] = 720 + themes[0]['height'] = 800 ret = self.run_test(themes) + assert ret[0], ret[1] assert self.process_data(themes) finally: #this step is important because the test will fail next time diff --git a/realsense2_camera/test/utils/pytest_live_camera_utils.py b/realsense2_camera/test/utils/pytest_live_camera_utils.py new file mode 100644 index 0000000000..82c36c38d8 --- /dev/null +++ b/realsense2_camera/test/utils/pytest_live_camera_utils.py @@ -0,0 +1,163 @@ +# Copyright 2023 Intel Corporation. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import sys +import time +import ctypes +import struct +import requests +import json + +def debug_print(*args): + if(False): + print(*args) +def get_camera_capabilities_short(device_type, serial_no=None): + short_data = os.popen("rs-enumerate-devices -s").read().splitlines() + print(serial_no) + for line in short_data: + print(line) + if device_type in line: + if serial_no is None or serial_no == "" : + print(device_type+ " found in " + line) + return + if serial_no in line: + print(device_type + " with serial_no " + serial_no +" found in " + line) + return + print(device_type + " not found") + +device_info_string = "Device info:" + +def get_device_info_location(long_data, index=0): + for line_no in range(index, len(long_data)): + if device_info_string in long_data[line_no]: + return line_no + return len(long_data) + +stream_profile_string = "Stream Profiles supported by" +def get_stream_profile_location(long_data, start_index, end_index, profile_string): + for line_no in range(start_index, end_index): + if stream_profile_string in long_data[line_no]: + if profile_string in long_data[line_no]: + return line_no + return None + +def get_depth_profiles(long_data, start_index, end_index): + cap = [] + for line_no in range(start_index, end_index): + if len(long_data[line_no]) == 0: + break + debug_print("depth profile processing:" + long_data[line_no]) + depth_profile = long_data[line_no].split() + if len(depth_profile) == 5: + profile = depth_profile[0] + value = depth_profile[1]+"x"+depth_profile[3] + format = depth_profile[4] + elif len(depth_profile) == 6: + profile = depth_profile[0]+depth_profile[1] + value = depth_profile[2]+"x"+depth_profile[4] + format = depth_profile[5] + else: + assert false, "Seems that the depth profile info format printed by rs-enumerate-devices changed" + value = value[:-2] + debug_print("depth profile added: " + profile, value, format) + cap.append([profile, value, format]) + debug_print(cap) + return cap + + +def get_color_profiles(long_data, start_index, end_index): + cap = [] + for line_no in range(start_index, end_index): + if len(long_data[line_no]) == 0: + break + debug_print("color profile processing:" + long_data[line_no]) + color_profile = long_data[line_no].split() + if len(color_profile) == 5: + profile = color_profile[0] + value = color_profile[1]+"x"+color_profile[3] + format = color_profile[4] + else: + assert false, "Seems that the color profile info format printed by rs-enumerate-devices changed" + value = value[:-2] + debug_print("color profile added: " + profile, value, format) + cap.append([profile, value, format]) + debug_print(cap) + return cap + +NAME_LINE_INDEX = 1 +NAME_LINE_NAME_OFFSET = 4 +SERIAL_NO_LINE_INDEX = 2 +SERIAL_NO_VALUE_OFFSET = 3 +def parse_device_info(long_data, start_index, end_index, device_type, serial_no): + #after device_info, the next line should have the name and device type + capability = {} + debug_print("Searching for data between lines ", str(start_index) + " and " + str(end_index)) + name_line = long_data[start_index+NAME_LINE_INDEX].split() + if name_line[0] != "Name": + assert False, "rs-enumerate-devices output format changed" + if name_line[4] != device_type: + debug_print("device not matching:" + name_line[NAME_LINE_NAME_OFFSET]) + return None + debug_print("device matched:" + name_line[NAME_LINE_NAME_OFFSET]) + if serial_no != None: + #next line after nameline should have the serial_no + serial_no_line = long_data[start_index+SERIAL_NO_LINE_INDEX].split() + if serial_no_line[0] != "Serial": + assert False, "rs-enumerate-devices output format changed" + if serial_no_line[SERIAL_NO_VALUE_OFFSET] != serial_no: + debug_print("serial_no not matching:" + serial_no_line[SERIAL_NO_VALUE_OFFSET]) + return None + debug_print("serial_no matched:" + serial_no_line[SERIAL_NO_VALUE_OFFSET]) + else: + serial_no = long_data[start_index+SERIAL_NO_LINE_INDEX].split()[SERIAL_NO_VALUE_OFFSET] + + capability["device_type"] = device_type + capability["serial_no"] = serial_no + depth_profile_index = get_stream_profile_location(long_data, start_index, end_index, "Stereo Module") + if depth_profile_index != None: + capability["depth_profile"] = get_depth_profiles(long_data, depth_profile_index+3, end_index) + rgb_profile_index = get_stream_profile_location(long_data, start_index, end_index, "RGB Camera") + if rgb_profile_index != None: + capability["color_profile"] = get_color_profiles(long_data, rgb_profile_index+3, end_index) + return capability + +def get_camera_capabilities(device_type, serial_no=None): + long_data = os.popen("rs-enumerate-devices").read().splitlines() + debug_print(serial_no) + index = 0 + while index < len(long_data): + index = get_device_info_location(long_data, index) + if index == len(long_data): + return + else: + debug_print("DeviceInfo found at: " + str(index)) + start_index = index + index += 1 + index = get_device_info_location(long_data, index) + capability = parse_device_info(long_data, start_index, index, device_type, serial_no) + if capability != None: + return capability + return None + +if __name__ == '__main__': + device_type = 'D455' + serial_no = None + if len(sys.argv) > 1: + device_type = sys.argv[1] + if len(sys.argv) > 2: + serial_no = sys.argv[2] + + cap = get_camera_capabilities(device_type, serial_no) + print("Capabilities:") + print(cap) \ No newline at end of file diff --git a/realsense2_camera/test/utils/pytest_rs_utils.py b/realsense2_camera/test/utils/pytest_rs_utils.py index 1983d3f647..20e9fded71 100644 --- a/realsense2_camera/test/utils/pytest_rs_utils.py +++ b/realsense2_camera/test/utils/pytest_rs_utils.py @@ -37,6 +37,8 @@ from rcl_interfaces.msg import Parameter from rcl_interfaces.msg import ParameterValue from rcl_interfaces.srv import SetParameters, GetParameters, ListParameters +from rcl_interfaces.msg import SetParametersResult +from rcl_interfaces.srv import SetParameters_Response from rcl_interfaces.msg import ParameterType from rcl_interfaces.msg import ParameterValue @@ -572,11 +574,12 @@ def __init__(self, name='test_node'): def wait_for_node(self, node_name, timeout=8.0): start = time.time() flag = False - print('Waiting for node... ' + node_name) + print('Waiting for node... ' + str(node_name)) while time.time() - start < timeout: print(node_name + ": waiting for the node to come up") flag = node_name in self.get_node_names() if flag: + print('Node is up: ' + str(node_name)) return True, "" time.sleep(timeout/5) return False, "Timed out waiting for "+ str(timeout)+ "seconds" @@ -711,6 +714,8 @@ def init_test(self,name='RsTestNode'): self.flag = False self.node = RsTestNode(name) self.subscribed_topics = [] + def wait_for_node(self, node_name, timeout=8.0): + self.node.wait_for_node(node_name, timeout) def create_subscription(self, msg_type, topic, data_type, store_raw_data=False): if not topic in self.subscribed_topics: self.node.create_subscription(msg_type, topic, data_type, store_raw_data) @@ -724,17 +729,33 @@ def create_param_ifs(self, camera_name): print('service not available, waiting again...') while not self.set_param_if.wait_for_service(timeout_sec=1.0): print('service not available, waiting again...') + + def send_param(self, req): + future = self.set_param_if.call_async(req) + while rclpy.ok(): + rclpy.spin_once(self.node) + if future.done(): + try: + response = future.result() + if response.results[0].successful: + return True + except Exception as e: + print("exception raised:") + print(e) + pass + return False + def set_string_param(self, param_name, param_value): req = SetParameters.Request() new_param_value = ParameterValue(type=ParameterType.PARAMETER_STRING, string_value=param_value) req.parameters = [Parameter(name=param_name, value=new_param_value)] - future = self.set_param_if.call_async(req) - + return self.send_param(req) + def set_bool_param(self, param_name, param_value): req = SetParameters.Request() new_param_value = ParameterValue(type=ParameterType.PARAMETER_BOOL, bool_value=param_value) req.parameters = [Parameter(name=param_name, value=new_param_value)] - future = self.set_param_if.call_async(req) + return self.send_param(req) def spin_for_data(self,themes): start = time.time() @@ -763,9 +784,9 @@ def spin_for_data(self,themes): def spin_for_time(self,wait_time): start = time.time() - print('Waiting for topic... ' ) + print('Waiting for time... ' ) flag = False - while time.time() - start < wait_time: + while time.time() - (start < wait_time): rclpy.spin_once(self.node) print('Spun once... ' )