Skip to content

Commit

Permalink
Added tf and tf_static tests
Browse files Browse the repository at this point in the history
  • Loading branch information
PrasRsRos committed Sep 5, 2023
1 parent a20eb25 commit c80e471
Show file tree
Hide file tree
Showing 4 changed files with 269 additions and 15 deletions.
4 changes: 2 additions & 2 deletions realsense2_camera/test/live_camera/test_camera_fps_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from rcl_interfaces.msg import ParameterValue
from rcl_interfaces.srv import SetParameters, GetParameters, ListParameters

test_params_depth_avg_1 = {
test_params_test_fps = {
'camera_name': 'D455',
'device_type': 'D455',
}
Expand All @@ -47,7 +47,7 @@
modified to make it work, see py_rs_utils for more details.
To check the fps, a value 'expected_fps_in_hz' has to be added to the corresponding theme
'''
@pytest.mark.parametrize("launch_descr_with_parameters", [test_params_depth_avg_1],indirect=True)
@pytest.mark.parametrize("launch_descr_with_parameters", [test_params_test_fps],indirect=True)
@pytest.mark.launch(fixture=launch_descr_with_parameters)
class TestCamera_TestFPS(pytest_rs_utils.RsTestBaseClass):
def test_camera_test_fps(self,launch_descr_with_parameters):
Expand Down
219 changes: 219 additions & 0 deletions realsense2_camera/test/live_camera/test_camera_tf_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
# Copyright 2023 Intel Corporation. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import os
import sys
import itertools


import pytest
import rclpy

from rclpy.qos import DurabilityPolicy
from rclpy.qos import HistoryPolicy
from rclpy.qos import QoSProfile
import tf2_ros


from sensor_msgs.msg import Image as msg_Image
from sensor_msgs.msg import Imu as msg_Imu
from sensor_msgs.msg import PointCloud2 as msg_PointCloud2
from tf2_msgs.msg import TFMessage as msg_TFMessage

import numpy as np

sys.path.append(os.path.abspath(os.path.dirname(__file__)+"/../utils"))
import pytest_rs_utils
from pytest_rs_utils import launch_descr_with_parameters

import pytest_live_camera_utils
from pytest_rs_utils import get_rosbag_file_path
from pytest_rs_utils import get_node_heirarchy

from rclpy.parameter import Parameter
from rcl_interfaces.msg import ParameterValue
from rcl_interfaces.srv import SetParameters, GetParameters, ListParameters

'''
The test was implemented to check the tf_static before and after infra1 was enabled. There shouldn't be any
related data before enabling.
'''
test_params_tf_static_change_d455 = {
'camera_name': 'D455',
'device_type': 'D455',
'enable_infra1': 'false',
'enable_infra2': 'true',
'enable_accel': 'true',
'enable_gyro': 'true',
}
test_params_tf_static_change_d435 = {
'camera_name': 'D435',
'device_type': 'D435',
'enable_infra1': 'false',
'enable_infra2': 'true',
'enable_accel': 'true',
'enable_gyro': 'true',
}

test_params_tf_static_change_d415 = {
'camera_name': 'D415',
'device_type': 'D415',
'enable_infra1': 'false',
'enable_infra2': 'true',
'enable_accel': 'true',
'enable_gyro': 'true',
}
@pytest.mark.parametrize("launch_descr_with_parameters", [
pytest.param(test_params_tf_static_change_d455, marks=pytest.mark.d455),
pytest.param(test_params_tf_static_change_d435, marks=pytest.mark.d435),
pytest.param(test_params_tf_static_change_d415, marks=pytest.mark.d415),
],indirect=True)
@pytest.mark.launch(fixture=launch_descr_with_parameters)
class TestCamera_TestTF_Static_change(pytest_rs_utils.RsTestBaseClass):
def test_camera_test_tf_static_change(self,launch_descr_with_parameters):
self.params = launch_descr_with_parameters[1]
if pytest_live_camera_utils.check_if_camera_connected(self.params['device_type']) == False:
print("Device not found? : " + self.params['device_type'])
return
themes = [
{'topic':'/tf_static',
'msg_type':msg_TFMessage,
'expected_data_chunks':1,
'qos' : QoSProfile(depth=100,durability=DurabilityPolicy.TRANSIENT_LOCAL,history=HistoryPolicy.KEEP_LAST,)#static
}
]
try:
'''
initialize, run and check the data
'''
self.init_test("RsTest"+self.params['camera_name'])
self.wait_for_node(self.params['camera_name'])
self.create_param_ifs(get_node_heirarchy(self.params))
ret = self.run_test(themes, timeout=10)
assert ret[0], ret[1]
frame_ids = [self.params['camera_name']+'_link',
self.params['camera_name']+'_depth_frame',
self.params['camera_name']+'_infra1_frame',
self.params['camera_name']+'_infra2_frame',
self.params['camera_name']+'_color_frame']
if self.params['device_type'] == 'D455':
frame_ids.append(self.params['camera_name']+'_gyro_frame')
frame_ids.append(self.params['camera_name']+'_accel_frame')

ret = self.process_data(themes, False)
assert ret[0], ret[1]
assert self.set_bool_param('enable_infra1', True)

ret = self.run_test(themes, timeout=10)
assert ret[0], ret[1]
ret = self.process_data(themes, True)
assert ret[0], ret[1]
finally:
self.shutdown()
def process_data(self, themes, enable_infra1):
frame_ids = [self.params['camera_name']+'_link',
self.params['camera_name']+'_depth_frame',
self.params['camera_name']+'_infra2_frame',
self.params['camera_name']+'_color_frame']
if self.params['device_type'] == 'D455':
frame_ids.append(self.params['camera_name']+'_gyro_frame')
frame_ids.append(self.params['camera_name']+'_accel_frame')
if enable_infra1:
frame_ids.append(self.params['camera_name']+'_infra1_frame')

data = self.node.pop_first_chunk('/tf_static')
ret = self.check_transform_data(data, frame_ids, True)
return ret


test_params_tf_d455 = {
'camera_name': 'D455',
'device_type': 'D455',
'publish_tf': 'true',
'tf_publish_rate': '1.1',
}

test_params_tf_d435 = {
'camera_name': 'D435',
'device_type': 'D435',
'publish_tf': 'true',
'tf_publish_rate': '1.1',
}

test_params_tf_d415 = {
'camera_name': 'D415',
'device_type': 'D415',
'publish_tf': 'true',
'tf_publish_rate': '1.1',
}
@pytest.mark.parametrize("launch_descr_with_parameters", [
pytest.param(test_params_tf_d455, marks=pytest.mark.d455),
pytest.param(test_params_tf_d435, marks=pytest.mark.d415),
pytest.param(test_params_tf_d415, marks=pytest.mark.d415),
],indirect=True)
@pytest.mark.launch(fixture=launch_descr_with_parameters)
class TestCamera_TestTF_DYN(pytest_rs_utils.RsTestBaseClass):
def test_camera_test_tf_dyn(self,launch_descr_with_parameters):
self.params = launch_descr_with_parameters[1]
if pytest_live_camera_utils.check_if_camera_connected(self.params['device_type']) == False:
print("Device not found? : " + self.params['device_type'])
return
themes = [
{'topic':'/tf',
'msg_type':msg_TFMessage,
'expected_data_chunks':3,
'qos' : QoSProfile(depth=100,durability=DurabilityPolicy.VOLATILE,history=HistoryPolicy.KEEP_LAST,)
#'qos' : QoSProfile(depth=100,durability=DurabilityPolicy.TRANSIENT_LOCAL,history=HistoryPolicy.KEEP_LAST,)#static
},
{'topic':'/tf_static',
'msg_type':msg_TFMessage,
'expected_data_chunks':1,
#'qos' : QoSProfile(depth=100,durability=DurabilityPolicy.VOLATILE,history=HistoryPolicy.KEEP_LAST,)
'qos' : QoSProfile(depth=100,durability=DurabilityPolicy.TRANSIENT_LOCAL,history=HistoryPolicy.KEEP_LAST,)#static
}
]
try:
'''
initialize, run and check the data
'''
self.init_test("RsTest"+self.params['camera_name'])
self.wait_for_node(self.params['camera_name'])
self.create_param_ifs(get_node_heirarchy(self.params))
ret = self.run_test(themes, timeout=10)
assert ret[0], ret[1]
ret = self.process_data(themes, False)
assert ret[0], ret[1]
finally:
self.shutdown()

def process_data(self, themes, enable_infra1):
frame_ids = [self.params['camera_name']+'_link',
self.params['camera_name']+'_depth_frame',
self.params['camera_name']+'_color_frame']
data = self.node.pop_first_chunk('/tf_static')
ret = self.check_transform_data(data, frame_ids, True)
assert ret[0], ret[1]
data = self.node.pop_first_chunk('/tf')
ret = self.check_transform_data(data, frame_ids)
assert ret[0], ret[1]
data = self.node.pop_first_chunk('/tf')
ret = self.check_transform_data(data, frame_ids)
assert ret[0], ret[1]
data = self.node.pop_first_chunk('/tf')
ret = self.check_transform_data(data, frame_ids)
assert ret[0], ret[1]
#return True, ""

return True, ""
17 changes: 17 additions & 0 deletions realsense2_camera/test/utils/pytest_live_camera_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,23 @@ def get_camera_capabilities(device_type, serial_no=None):
return capability
return None

def check_if_camera_connected(device_type, serial_no=None):
long_data = os.popen("rs-enumerate-devices -s").read().splitlines()
debug_print(serial_no)
index = 0
for index in range(len(long_data)):
name_line = long_data[index].split()
if name_line[0] != "Intel":
continue
if name_line[2] != device_type:
continue
if serial_no == None:
return True
if serial_no == name_line[3]:
return True

return False

if __name__ == '__main__':
device_type = 'D455'
serial_no = None
Expand Down
44 changes: 31 additions & 13 deletions realsense2_camera/test/utils/pytest_rs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
import requests

def debug_print(*args):
if(True):
if(False):
print(*args)

class RosbagManager(object):
Expand Down Expand Up @@ -623,7 +623,7 @@ def reset_data(self, topic):
self.frame_counter[topic] = 0
self._ros_topic_hz.restart_topic(topic)

def create_subscription(self, msg_type, topic , data_type, store_raw_data, measure_hz, static_tf):
def create_subscription(self, msg_type, topic , data_type, store_raw_data, measure_hz):
self.reset_data(topic)
super().create_subscription(msg_type, topic , self.rsCallback(topic,msg_type, store_raw_data), data_type)
#hz measurements are not working
Expand All @@ -632,7 +632,7 @@ def create_subscription(self, msg_type, topic , data_type, store_raw_data, measu
super().create_subscription(msg_class,topic,functools.partial(self._ros_topic_hz._callback_hz, topic=topic),data_type)
self._ros_topic_hz.set_last_printed_tn(0, topic=topic)

if (static_tf == True) and (self.tfBuffer == None):
if self.tfBuffer == None:
self.tfBuffer = tf2_ros.Buffer()
self.tf_listener = tf2_ros.TransformListener(self.tfBuffer, super())
def get_tfs(self, coupled_frame_ids):
Expand Down Expand Up @@ -771,9 +771,9 @@ def init_test(self,name='RsTestNode'):

def wait_for_node(self, node_name, timeout=8.0):
self.node.wait_for_node(node_name, timeout)
def create_subscription(self, msg_type, topic, data_type, store_raw_data=False, measure_hz=False, static_tf=False):
def create_subscription(self, msg_type, topic, data_type, store_raw_data=False, measure_hz=False):
if not topic in self.subscribed_topics:
self.node.create_subscription(msg_type, topic, data_type, store_raw_data, measure_hz, static_tf)
self.node.create_subscription(msg_type, topic, data_type, store_raw_data, measure_hz)
self.subscribed_topics.append(topic)
else:
self.node.reset_data(topic)
Expand Down Expand Up @@ -859,7 +859,6 @@ def spin_for_time(self,wait_time):

def run_test(self, themes, initial_wait_time=0.0, timeout=5.0):
try:
static_tf_found = False
for theme in themes:
store_raw_data = False
if 'store_raw_data' in theme:
Expand All @@ -868,13 +867,10 @@ def run_test(self, themes, initial_wait_time=0.0, timeout=5.0):
measure_hz = True
else:
measure_hz = False
if 'static_tf' in theme:
static_tf = True
static_tf_found = True
else:
static_tf = False

self.create_subscription(theme['msg_type'], theme['topic'] , qos.qos_profile_sensor_data,store_raw_data, measure_hz, static_tf)
qos_type = qos.qos_profile_sensor_data
if 'qos' in theme:
qos_type = theme['qos']
self.create_subscription(theme['msg_type'], theme['topic'] , qos_type,store_raw_data, measure_hz)
print('subscription created for ' + theme['topic'])
if initial_wait_time != 0.0:
self.spin_for_time(initial_wait_time)
Expand All @@ -893,6 +889,28 @@ def run_test(self, themes, initial_wait_time=0.0, timeout=5.0):
return self.flag
def get_tfs(self, coupled_frame_ids):
return self.node.get_tfs(coupled_frame_ids)


def check_transform_data(self, data, frame_ids, is_static=False):
coupled_frame_ids = [xx for xx in itertools.combinations(frame_ids, 2)]
tfBuffer = tf2_ros.Buffer()
for transform in data.transforms:
if is_static:
tfBuffer.set_transform_static(transform, "default_authority")
else:
tfBuffer.set_transform(transform, "default_authority")
res = dict()
for couple in coupled_frame_ids:
from_id, to_id = couple
if (tfBuffer.can_transform(from_id, to_id, rclpy.time.Time(), rclpy.time.Duration(nanoseconds=3e6))):
res[couple] = tfBuffer.lookup_transform(from_id, to_id, rclpy.time.Time(), rclpy.time.Duration(nanoseconds=1e6)).transform
else:
res[couple] = None
for couple in coupled_frame_ids:
if res[couple] == None:
return False, str(couple) + ": didn't get any tf data"
return True,""

'''
Please override and use your own process_data if the default check is not suitable.
Please also store_raw_data parameter in the themes as True, if you want the
Expand Down

0 comments on commit c80e471

Please sign in to comment.