From a8e1a662486d273b64495b3b9c5bfd92a712c990 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 11 Jan 2024 17:24:33 +0800 Subject: [PATCH] Create querying subscriber if qos is transient local Signed-off-by: Yadunund --- rmw_zenoh_cpp/CMakeLists.txt | 2 + rmw_zenoh_cpp/package.xml | 1 + rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 4 +- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 103 +++++++++++++------- 4 files changed, 75 insertions(+), 35 deletions(-) diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index 24b70723..85230a8f 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -21,6 +21,7 @@ find_package(rcutils REQUIRED) find_package(rosidl_typesupport_fastrtps_c REQUIRED) find_package(rosidl_typesupport_fastrtps_cpp REQUIRED) find_package(rmw REQUIRED) +find_package(rmw_dds_common REQUIRED) find_package(zenoh_c_vendor REQUIRED) find_package(zenohc REQUIRED) @@ -57,6 +58,7 @@ target_link_libraries(rmw_zenoh_cpp rosidl_typesupport_fastrtps_c::rosidl_typesupport_fastrtps_c rosidl_typesupport_fastrtps_cpp::rosidl_typesupport_fastrtps_cpp rmw::rmw + rmw_dds_common::rmw_dds_common_library zenohc::lib ) diff --git a/rmw_zenoh_cpp/package.xml b/rmw_zenoh_cpp/package.xml index 02abec46..26b2a946 100644 --- a/rmw_zenoh_cpp/package.xml +++ b/rmw_zenoh_cpp/package.xml @@ -19,6 +19,7 @@ rosidl_typesupport_fastrtps_c rosidl_typesupport_fastrtps_cpp rmw + rmw_dds_common ament_lint_auto ament_lint_common diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index de45faba..3df33b9d 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include "rcutils/allocator.h" @@ -110,7 +111,8 @@ struct saved_msg_data ///============================================================================== struct rmw_subscription_data_t { - z_owned_subscriber_t sub; + // An owned subscriber or querying_subscriber depending on the QoS settings. + std::variant sub; // Liveliness token for the subscription. zc_owned_liveliness_token_t token; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 5653fae6..b37f3445 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -49,6 +49,8 @@ #include "rmw/validate_namespace.h" #include "rmw/validate_node_name.h" +#include "rmw_dds_common/qos.hpp" + namespace { @@ -1137,20 +1139,15 @@ rmw_create_subscription( } RMW_CHECK_ARGUMENT_FOR_NULL(qos_profile, nullptr); // Adapt any 'best available' QoS options - // rmw_qos_profile_t adapted_qos_profile = *qos_profile; - // rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_subscription( - // node, topic_name, &adapted_qos_profile, rmw_get_publishers_info_by_topic); - // if (RMW_RET_OK != ret) { - // return nullptr; - // } + rmw_qos_profile_t adapted_qos_profile = *qos_profile; + rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_subscription( + node, topic_name, &adapted_qos_profile, rmw_get_publishers_info_by_topic); + if (RMW_RET_OK != ret) { + RMW_SET_ERROR_MSG("Failed to obtain adapted_qos_profile."); + return nullptr; + } RMW_CHECK_ARGUMENT_FOR_NULL(subscription_options, nullptr); - // Check RMW QoS - // if (!is_valid_qos(*qos_profile)) { - // RMW_SET_ERROR_MSG("create_subscription() called with invalid QoS"); - // return nullptr; - // } - const rosidl_message_type_support_t * type_support = find_message_type_support(type_supports); if (type_support == nullptr) { // error was already set by find_message_type_support @@ -1216,15 +1213,6 @@ rmw_create_subscription( rmw_subscription_data_t); }); - // Set the reliability of the subscription options based on qos_profile. - // The default options will be initialized with Best Effort reliability. - auto sub_options = z_subscriber_options_default(); - sub_data->reliable = false; - if (qos_profile->reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { - sub_options.reliability = Z_RELIABILITY_RELIABLE; - sub_data->reliable = true; - } - sub_data->typesupport_identifier = type_support->typesupport_identifier; sub_data->type_support_impl = type_support->data; auto callbacks = static_cast(type_support->data); @@ -1291,19 +1279,54 @@ rmw_create_subscription( RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); return nullptr; } - sub_data->sub = z_declare_subscriber( - z_loan(context_impl->session), - z_loan(keyexpr), - z_move(callback), - &sub_options - ); - if (!z_check(sub_data->sub)) { - RMW_SET_ERROR_MSG("unable to create zenoh subscription"); - return nullptr; + // Instantiate the subscription with suitable options depending on the + // adapted_qos_profile. + // TODO(Yadunund): Rely on a separate function to return the sub + // as we start supporting more qos settings. + sub_data->reliable = false; + if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { + ze_querying_subscriber_options_t sub_options = ze_querying_subscriber_options_default(); + if (adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { + sub_options.reliability = Z_RELIABILITY_RELIABLE; + sub_data->reliable = true; + sub_options.query_target = Z_QUERY_TARGET_ALL_COMPLETE; + } + sub_data->sub = ze_declare_querying_subscriber( + z_loan(context_impl->session), + z_loan(keyexpr), + z_move(callback), + &sub_options + ); + if (!z_check(std::get(sub_data->sub))) { + RMW_SET_ERROR_MSG("unable to create zenoh subscription"); + return nullptr; + } } + // Create a regular subscriber for all other durability settings. + else { + z_subscriber_options_t sub_options = z_subscriber_options_default(); + if (qos_profile->reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { + sub_options.reliability = Z_RELIABILITY_RELIABLE; + sub_data->reliable = true; + } + sub_data->sub = z_declare_subscriber( + z_loan(context_impl->session), + z_loan(keyexpr), + z_move(callback), + &sub_options + ); + if (!z_check(std::get(sub_data->sub))) { + RMW_SET_ERROR_MSG("unable to create zenoh subscription"); + return nullptr; + } + } + auto undeclare_z_sub = rcpputils::make_scope_exit( [sub_data]() { - z_undeclare_subscriber(z_move(sub_data->sub)); + // TODO(Yadunund): Check if this is okay or if it is better + // to cast into explicit types and call appropriate undeclare method. + // See rmw_destroy_subscription() + z_drop(z_move(sub_data->sub)); }); // Publish to the graph that a new subscription is in town @@ -1380,9 +1403,19 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) RMW_TRY_DESTRUCTOR(sub_data->type_support->~MessageTypeSupport(), MessageTypeSupport, ); allocator->deallocate(sub_data->type_support, allocator->state); - if (z_undeclare_subscriber(z_move(sub_data->sub))) { - RMW_SET_ERROR_MSG("failed to undeclare sub"); - ret = RMW_RET_ERROR; + z_owned_subscriber_t * sub = std::get_if(&sub_data->sub); + if (sub != nullptr) { + if (z_undeclare_subscriber(sub)) { + RMW_SET_ERROR_MSG("failed to undeclare sub"); + ret = RMW_RET_ERROR; + } + } else { + ze_owned_querying_subscriber_t * querying_sub = + std::get_if(&sub_data->sub); + if (querying_sub == nullptr || ze_undeclare_querying_subscriber(querying_sub)) { + RMW_SET_ERROR_MSG("failed to undeclare sub"); + ret = RMW_RET_ERROR; + } } RMW_TRY_DESTRUCTOR(sub_data->~rmw_subscription_data_t(), rmw_subscription_data_t, ); @@ -1429,6 +1462,8 @@ rmw_subscription_get_actual_qos( qos->reliability = sub_data->reliable ? RMW_QOS_POLICY_RELIABILITY_RELIABLE : RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT; + qos->durability = std::holds_alternative(sub_data->sub) ? + RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL : RMW_QOS_POLICY_DURABILITY_VOLATILE; return RMW_RET_OK; }