From 747a72ee774c65113b6f738fd8b87c586ae21e1b Mon Sep 17 00:00:00 2001 From: Yadu Date: Wed, 3 Jul 2024 15:20:01 -0700 Subject: [PATCH] Implement custom functions to adapt qos (#234) Signed-off-by: Yadunund --- rmw_zenoh_cpp/CMakeLists.txt | 3 +- rmw_zenoh_cpp/package.xml | 1 - rmw_zenoh_cpp/src/detail/logging.cpp | 2 - rmw_zenoh_cpp/src/detail/logging.hpp | 5 - rmw_zenoh_cpp/src/detail/qos.cpp | 134 ++++++++++++++++++++ rmw_zenoh_cpp/src/detail/qos.hpp | 56 ++++++++ rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 12 +- rmw_zenoh_cpp/src/rmw_qos.cpp | 2 - rmw_zenoh_cpp/src/rmw_zenoh.cpp | 64 ++++------ 9 files changed, 223 insertions(+), 56 deletions(-) create mode 100644 rmw_zenoh_cpp/src/detail/qos.cpp create mode 100644 rmw_zenoh_cpp/src/detail/qos.hpp diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index a67c7c6f..9565a304 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -21,7 +21,6 @@ find_package(rcutils REQUIRED) find_package(rosidl_typesupport_fastrtps_c REQUIRED) find_package(rosidl_typesupport_fastrtps_cpp REQUIRED) find_package(rmw REQUIRED) -find_package(rmw_dds_common REQUIRED) find_package(zenoh_c_vendor REQUIRED) find_package(zenohc_debug QUIET) if(NOT zenohc_debug_FOUND) @@ -38,6 +37,7 @@ add_library(rmw_zenoh_cpp SHARED src/detail/liveliness_utils.cpp src/detail/logging.cpp src/detail/message_type_support.cpp + src/detail/qos.cpp src/detail/rmw_data_types.cpp src/detail/service_type_support.cpp src/detail/type_support.cpp @@ -65,7 +65,6 @@ target_link_libraries(rmw_zenoh_cpp rosidl_typesupport_fastrtps_c::rosidl_typesupport_fastrtps_c rosidl_typesupport_fastrtps_cpp::rosidl_typesupport_fastrtps_cpp rmw::rmw - rmw_dds_common::rmw_dds_common_library zenohc::lib ) diff --git a/rmw_zenoh_cpp/package.xml b/rmw_zenoh_cpp/package.xml index 928bb09f..2297a0b1 100644 --- a/rmw_zenoh_cpp/package.xml +++ b/rmw_zenoh_cpp/package.xml @@ -22,7 +22,6 @@ rosidl_typesupport_fastrtps_c rosidl_typesupport_fastrtps_cpp rmw - rmw_dds_common ament_lint_auto ament_lint_common diff --git a/rmw_zenoh_cpp/src/detail/logging.cpp b/rmw_zenoh_cpp/src/detail/logging.cpp index fdbb2fc8..b9ed8cbe 100644 --- a/rmw_zenoh_cpp/src/detail/logging.cpp +++ b/rmw_zenoh_cpp/src/detail/logging.cpp @@ -14,8 +14,6 @@ #include "logging.hpp" -#include - namespace rmw_zenoh_cpp { ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/logging.hpp b/rmw_zenoh_cpp/src/detail/logging.hpp index e81c0372..171d5694 100644 --- a/rmw_zenoh_cpp/src/detail/logging.hpp +++ b/rmw_zenoh_cpp/src/detail/logging.hpp @@ -17,11 +17,6 @@ #include -#include -#include -#include -#include - namespace rmw_zenoh_cpp { ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/qos.cpp b/rmw_zenoh_cpp/src/detail/qos.cpp new file mode 100644 index 00000000..87a336f1 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/qos.cpp @@ -0,0 +1,134 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "qos.hpp" + +// Define defaults for various QoS settings. +#define RMW_ZENOH_DEFAULT_HISTORY RMW_QOS_POLICY_HISTORY_KEEP_LAST; +// If the depth field in the qos profile is set to 0, the RMW implementation +// has the liberty to assign a default depth. The zenoh transport protocol +// is configured with 256 channels so theoretically, this would be the maximum +// depth we can set before blocking transport. A high depth would increase the +// memory footprint of processes as more messages are stored in memory while a +// very low depth might unintentionally drop messages leading to a poor +// out-of-the-box experience for new users. For now we set the depth to 42, +// a popular "magic number". See https://en.wikipedia.org/wiki/42_(number). +#define RMW_ZENOH_DEFAULT_HISTORY_DEPTH 42; +#define RMW_ZENOH_DEFAULT_RELIABILITY RMW_QOS_POLICY_RELIABILITY_RELIABLE; +#define RMW_ZENOH_DEFAULT_DURABILITY RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL; +#define RMW_ZENOH_DEFAULT_DEADLINE RMW_DURATION_INFINITE; +#define RMW_ZENOH_DEFAULT_LIFESPAN RMW_DURATION_INFINITE; +#define RMW_ZENOH_DEFAULT_LIVELINESS RMW_QOS_POLICY_LIVELINESS_AUTOMATIC; +#define RMW_ZENOH_DEFAULT_LIVELINESS_LEASE_DURATION RMW_DURATION_INFINITE; + +namespace rmw_zenoh_cpp +{ +///============================================================================= +QoS::QoS() +{ + default_qos_.history = RMW_ZENOH_DEFAULT_HISTORY; + default_qos_.depth = RMW_ZENOH_DEFAULT_HISTORY_DEPTH; + default_qos_.reliability = RMW_ZENOH_DEFAULT_RELIABILITY; + default_qos_.durability = RMW_ZENOH_DEFAULT_DURABILITY; + default_qos_.deadline = RMW_ZENOH_DEFAULT_DEADLINE; + default_qos_.lifespan = RMW_ZENOH_DEFAULT_LIFESPAN; + default_qos_.liveliness = RMW_ZENOH_DEFAULT_LIVELINESS; + default_qos_.liveliness_lease_duration = RMW_ZENOH_DEFAULT_LIVELINESS_LEASE_DURATION; +} + +///============================================================================= +QoS & QoS::get() +{ + static QoS qos; + return qos; +} + +///============================================================================= +const rmw_qos_profile_t & QoS::default_qos() const +{ + return default_qos_; +} + +///============================================================================= +rmw_ret_t QoS::best_available_qos( + const rmw_node_t * node, + const char * topic_name, + rmw_qos_profile_t * qos_profile, + const GetEndpointInfoByTopicFunction & get_endpoint_info_for_other) const +{ + // We could rely on the GetEndpointInfoByTopicFunction callback to get + // endpoint information of downstream consumers to match certain QoS settings. + // Practically since Zenoh transport will succeed for any combination of + // settings, this could only be useful to avoid creating transient_local + // subscriptions when there are no transient_local publishers in the graph. + // This will avoid some overhead with having QueryingSubscriber. For now, + // we skip this optimization. + static_cast(node); + static_cast(topic_name); + static_cast(get_endpoint_info_for_other); + + switch (qos_profile->history) { + case RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT: + case RMW_QOS_POLICY_RELIABILITY_UNKNOWN: + qos_profile->history = default_qos_.history; + default: + break; + } + + if (qos_profile->depth == 0) { + qos_profile->depth = default_qos_.depth; + } + + switch (qos_profile->reliability) { + case RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT: + case RMW_QOS_POLICY_RELIABILITY_UNKNOWN: + qos_profile->reliability = default_qos_.reliability; + default: + break; + } + + switch (qos_profile->durability) { + case RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT: + case RMW_QOS_POLICY_DURABILITY_UNKNOWN: + qos_profile->durability = default_qos_.durability; + default: + break; + } + + if (rmw_time_equal(qos_profile->deadline, RMW_QOS_DEADLINE_DEFAULT)) { + qos_profile->deadline = default_qos_.deadline; + } + + if (rmw_time_equal(qos_profile->lifespan, RMW_QOS_LIFESPAN_DEFAULT)) { + qos_profile->lifespan = default_qos_.lifespan; + } + + if (rmw_time_equal( + qos_profile->liveliness_lease_duration, + RMW_QOS_LIVELINESS_LEASE_DURATION_DEFAULT)) + { + qos_profile->liveliness_lease_duration = default_qos_.liveliness_lease_duration; + } + + switch (qos_profile->liveliness) { + case RMW_QOS_POLICY_LIVELINESS_SYSTEM_DEFAULT: + case RMW_QOS_POLICY_LIVELINESS_UNKNOWN: + qos_profile->liveliness = default_qos_.liveliness; + default: + break; + } + + return RMW_RET_OK; +} +} // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/qos.hpp b/rmw_zenoh_cpp/src/detail/qos.hpp new file mode 100644 index 00000000..eee74c05 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/qos.hpp @@ -0,0 +1,56 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__QOS_HPP_ +#define DETAIL__QOS_HPP_ + +#include +#include +#include + +#include + +namespace rmw_zenoh_cpp +{ +//============================================================================== +/// Signature matching rmw_get_publishers_info_by_topic and rmw_get_subscriptions_info_by_topic +using GetEndpointInfoByTopicFunction = std::function; + +//============================================================================== +class QoS +{ +public: + static QoS & get(); + + const rmw_qos_profile_t & default_qos() const; + + rmw_ret_t best_available_qos( + const rmw_node_t * node, + const char * topic_name, + rmw_qos_profile_t * qos_profile, + const GetEndpointInfoByTopicFunction & get_endpoint_info_for_other) const; + +private: + // Private constructor which initializes the default qos. + QoS(); + rmw_qos_profile_t default_qos_; +}; +} // namespace rmw_zenoh_cpp + +#endif // DETAIL__QOS_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 2166c72d..eae1fdfc 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -142,7 +142,9 @@ void rmw_subscription_data_t::add_new_message( { std::lock_guard lock(message_queue_mutex_); - if (message_queue_.size() >= adapted_qos_profile.depth) { + if (adapted_qos_profile.history != RMW_QOS_POLICY_HISTORY_KEEP_ALL && + message_queue_.size() >= adapted_qos_profile.depth) + { // Log warning if message is discarded due to hitting the queue depth RMW_ZENOH_LOG_DEBUG_NAMED( "rmw_zenoh_cpp", @@ -238,7 +240,9 @@ void rmw_service_data_t::notify() void rmw_service_data_t::add_new_query(std::unique_ptr query) { std::lock_guard lock(query_queue_mutex_); - if (query_queue_.size() >= adapted_qos_profile.depth) { + if (adapted_qos_profile.history != RMW_QOS_POLICY_HISTORY_KEEP_ALL && + query_queue_.size() >= adapted_qos_profile.depth) + { // Log warning if message is discarded due to hitting the queue depth z_owned_str_t keystr = z_keyexpr_to_string(z_loan(this->keyexpr)); RMW_ZENOH_LOG_ERROR_NAMED( @@ -331,7 +335,9 @@ void rmw_client_data_t::notify() void rmw_client_data_t::add_new_reply(std::unique_ptr reply) { std::lock_guard lock(reply_queue_mutex_); - if (reply_queue_.size() >= adapted_qos_profile.depth) { + if (adapted_qos_profile.history != RMW_QOS_POLICY_HISTORY_KEEP_ALL && + reply_queue_.size() >= adapted_qos_profile.depth) + { // Log warning if message is discarded due to hitting the queue depth z_owned_str_t keystr = z_keyexpr_to_string(z_loan(this->keyexpr)); RMW_ZENOH_LOG_ERROR_NAMED( diff --git a/rmw_zenoh_cpp/src/rmw_qos.cpp b/rmw_zenoh_cpp/src/rmw_qos.cpp index 445ee68f..2b3b7cc0 100644 --- a/rmw_zenoh_cpp/src/rmw_qos.cpp +++ b/rmw_zenoh_cpp/src/rmw_qos.cpp @@ -20,8 +20,6 @@ #include "rmw/types.h" #include "rmw/qos_profiles.h" -#include "rmw_dds_common/qos.hpp" - extern "C" { namespace diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 3e4660f1..21f72b34 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -35,6 +35,7 @@ #include "detail/liveliness_utils.hpp" #include "detail/logging_macros.hpp" #include "detail/message_type_support.hpp" +#include "detail/qos.hpp" #include "detail/rmw_data_types.hpp" #include "detail/serialization_format.hpp" #include "detail/type_support_common.hpp" @@ -55,18 +56,6 @@ #include "rmw/validate_namespace.h" #include "rmw/validate_node_name.h" -#include "rmw_dds_common/qos.hpp" - -// If the depth field in the qos profile is set to 0, the RMW implementation -// has the liberty to assign a default depth. The zenoh transport protocol -// is configured with 256 channels so theoretically, this would be the maximum -// depth we can set before blocking transport. A high depth would increase the -// memory footprint of processes as more messages are stored in memory while a -// very low depth might unintentionally drop messages leading to a poor -// out-of-the-box experience for new users. For now we set the depth to 42, -// a popular "magic number". See https://en.wikipedia.org/wiki/42_(number). -#define RMW_ZENOH_DEFAULT_HISTORY_DEPTH 42; - namespace { //============================================================================== @@ -548,16 +537,14 @@ rmw_create_publisher( // Adapt any 'best available' QoS options publisher_data->adapted_qos_profile = *qos_profile; - rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_publisher( + rmw_ret_t ret = rmw_zenoh_cpp::QoS::get().best_available_qos( node, topic_name, &publisher_data->adapted_qos_profile, rmw_get_subscriptions_info_by_topic); if (RMW_RET_OK != ret) { return nullptr; } - // If a depth of 0 was provided, the RMW implementation should choose a suitable default. - publisher_data->adapted_qos_profile.depth = - publisher_data->adapted_qos_profile.depth > 0 ? - publisher_data->adapted_qos_profile.depth : - RMW_ZENOH_DEFAULT_HISTORY_DEPTH; + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", "create publisher liveliness: %zu", + publisher_data->adapted_qos_profile.liveliness); publisher_data->typesupport_identifier = type_support->typesupport_identifier; publisher_data->type_hash = type_support->get_type_hash_func(type_support); @@ -1381,17 +1368,12 @@ rmw_create_subscription( // Adapt any 'best available' QoS options sub_data->adapted_qos_profile = *qos_profile; - rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_subscription( + rmw_ret_t ret = rmw_zenoh_cpp::QoS::get().best_available_qos( node, topic_name, &sub_data->adapted_qos_profile, rmw_get_publishers_info_by_topic); if (RMW_RET_OK != ret) { RMW_SET_ERROR_MSG("Failed to obtain adapted_qos_profile."); return nullptr; } - // If a depth of 0 was provided, the RMW implementation should choose a suitable default. - sub_data->adapted_qos_profile.depth = - sub_data->adapted_qos_profile.depth > 0 ? - sub_data->adapted_qos_profile.depth : - RMW_ZENOH_DEFAULT_HISTORY_DEPTH; sub_data->typesupport_identifier = type_support->typesupport_identifier; sub_data->type_hash = type_support->get_type_hash_func(type_support); @@ -2160,13 +2142,13 @@ rmw_create_client( generate_random_gid(client_data->client_gid); // Adapt any 'best available' QoS options - client_data->adapted_qos_profile = - rmw_dds_common::qos_profile_update_best_available_for_services(*qos_profile); - // If a depth of 0 was provided, the RMW implementation should choose a suitable default. - client_data->adapted_qos_profile.depth = - client_data->adapted_qos_profile.depth > 0 ? - client_data->adapted_qos_profile.depth : - RMW_ZENOH_DEFAULT_HISTORY_DEPTH; + client_data->adapted_qos_profile = *qos_profile; + rmw_ret_t ret = rmw_zenoh_cpp::QoS::get().best_available_qos( + nullptr, nullptr, &client_data->adapted_qos_profile, nullptr); + if (RMW_RET_OK != ret) { + RMW_SET_ERROR_MSG("Failed to obtain adapted_qos_profile."); + return nullptr; + } // Obtain the type support const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); @@ -2631,7 +2613,7 @@ rmw_create_service( const rmw_node_t * node, const rosidl_service_type_support_t * type_supports, const char * service_name, - const rmw_qos_profile_t * qos_profiles) + const rmw_qos_profile_t * qos_profile) { // ASSERTIONS ================================================================ RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr); @@ -2646,8 +2628,8 @@ rmw_create_service( RMW_SET_ERROR_MSG("service_name argument is an empty string"); return nullptr; } - RMW_CHECK_ARGUMENT_FOR_NULL(qos_profiles, nullptr); - if (!qos_profiles->avoid_ros_namespace_conventions) { + RMW_CHECK_ARGUMENT_FOR_NULL(qos_profile, nullptr); + if (!qos_profile->avoid_ros_namespace_conventions) { int validation_result = RMW_TOPIC_VALID; // TODO(francocipollone): Verify if this is the right way to validate the service name. rmw_ret_t ret = rmw_validate_full_topic_name(service_name, &validation_result, nullptr); @@ -2729,13 +2711,13 @@ rmw_create_service( }); // Adapt any 'best available' QoS options - service_data->adapted_qos_profile = - rmw_dds_common::qos_profile_update_best_available_for_services(*qos_profiles); - // If a depth of 0 was provided, the RMW implementation should choose a suitable default. - service_data->adapted_qos_profile.depth = - service_data->adapted_qos_profile.depth > 0 ? - service_data->adapted_qos_profile.depth : - RMW_ZENOH_DEFAULT_HISTORY_DEPTH; + service_data->adapted_qos_profile = *qos_profile; + rmw_ret_t ret = rmw_zenoh_cpp::QoS::get().best_available_qos( + nullptr, nullptr, &service_data->adapted_qos_profile, nullptr); + if (RMW_RET_OK != ret) { + RMW_SET_ERROR_MSG("Failed to obtain adapted_qos_profile."); + return nullptr; + } // Get the RMW type support. const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports);