From f757f3ac35cff1db0c12f3bc18e234c87a93c606 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 11 Jan 2024 19:03:47 +0800 Subject: [PATCH] Update graph cache and introspection methods with qos Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 21 ++- rmw_zenoh_cpp/src/detail/liveliness_utils.cpp | 135 +++++++++++++----- rmw_zenoh_cpp/src/detail/liveliness_utils.hpp | 6 +- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 20 +-- 4 files changed, 125 insertions(+), 57 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 80cd089a..43166ff1 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -174,11 +174,10 @@ void GraphCache::parse_put(const std::string & keyexpr) RCUTILS_LOG_INFO_NAMED( "rmw_zenoh_cpp", - "Added %s on topic %s with type %s and qos %s to node /%s.", + "Added %s on topic %s with type %s to node /%s.", entity_desc.c_str(), topic_info.name_.c_str(), topic_info.type_.c_str(), - topic_info.qos_.c_str(), graph_node.name_.c_str()); }; @@ -409,11 +408,10 @@ void GraphCache::parse_del(const std::string & keyexpr) RCUTILS_LOG_INFO_NAMED( "rmw_zenoh_cpp", - "Removed %s on topic %s with type %s and qos %s to node /%s.", + "Removed %s on topic %s with type %s to node /%s.", entity_desc.c_str(), topic_info.name_.c_str(), topic_info.type_.c_str(), - topic_info.qos_.c_str(), graph_node.name_.c_str()); }; @@ -921,7 +919,7 @@ rmw_ret_t GraphCache::get_entities_info_by_topic( entity_type == EntityType::Publisher ? nodes[i]->pubs_ : nodes[i]->subs_; const GraphNode::TopicDataMap & topic_data_map = entity_map.find(topic_name)->second; - for (const auto & [topic_data, _] : topic_data_map) { + for (const auto & [topic_type, topic_data] : topic_data_map) { rmw_topic_endpoint_info_t & endpoint_info = endpoints_info->info_array[i]; endpoint_info = rmw_get_zero_initialized_topic_endpoint_info(); @@ -943,7 +941,7 @@ rmw_ret_t GraphCache::get_entities_info_by_topic( ret = rmw_topic_endpoint_info_set_topic_type( &endpoint_info, - _demangle_if_ros_type(topic_data).c_str(), + _demangle_if_ros_type(topic_type).c_str(), allocator); if (RMW_RET_OK != ret) { return ret; @@ -955,7 +953,16 @@ rmw_ret_t GraphCache::get_entities_info_by_topic( if (RMW_RET_OK != ret) { return ret; } - // TODO(Yadunund): Set type_hash, qos_profile, gid. + + ret = rmw_topic_endpoint_info_set_qos_profile( + &endpoint_info, + &topic_data->info_.qos_ + ); + if (RMW_RET_OK != ret) { + return ret; + } + + // TODO(Yadunund): Set type_hash, gid. } } diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index 7118b64b..66aab243 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -47,7 +47,7 @@ NodeInfo::NodeInfo( TopicInfo::TopicInfo( std::string name, std::string type, - std::string qos) + rmw_qos_profile_t qos) : name_(std::move(name)), type_(std::move(type)), qos_(std::move(qos)) @@ -67,6 +67,7 @@ static const char SUB_STR[] = "MS"; static const char SRV_STR[] = "SS"; static const char CLI_STR[] = "SC"; static const char SLASH_REPLACEMENT = '%'; +static const char QOS_DELIMITER = ':'; static const std::unordered_map entity_to_str = { {EntityType::Node, NODE_STR}, @@ -84,6 +85,12 @@ static const std::unordered_map str_to_entity = { {CLI_STR, EntityType::Client} }; +// Map string literals to qos enums. +// static const std::unordered_map str_to_qos_e = { +// {"reliable", RMW_QOS_POLICY_RELIABILITY_RELIABLE}, +// {"best_effort", RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT} +// }; + std::string zid_to_str(z_id_t id) { std::stringstream ss; @@ -96,6 +103,84 @@ std::string zid_to_str(z_id_t id) return ss.str(); } +std::vector split_keyexpr( + const std::string & keyexpr, + const char delim = '/') +{ + std::vector delim_idx = {}; + // Insert -1 for starting position to make the split easier when using substr. + delim_idx.push_back(-1); + std::size_t idx = 0; + for (std::string::const_iterator it = keyexpr.begin(); it != keyexpr.end(); ++it) { + if (*it == delim) { + delim_idx.push_back(idx); + } + ++idx; + } + std::vector result = {}; + try { + for (std::size_t i = 1; i < delim_idx.size(); ++i) { + const size_t prev_idx = delim_idx[i - 1]; + const size_t idx = delim_idx[i]; + result.push_back(keyexpr.substr(prev_idx + 1, idx - prev_idx - 1)); + } + } catch (const std::exception & e) { + printf("%s\n", e.what()); + return {}; + } + // Finally add the last substr. + result.push_back(keyexpr.substr(delim_idx.back() + 1)); + return result; +} + +/** + * Convert a rmw_qos_profile_t to a string with format: + * + * ::," + * Where: + * - "reliable" or "best_effort". + * - "volatile" or "transient_local". + * - "keep_last". + * - The depth number. + */ +// TODO(Yadunund): Rely on maps to retrieve strings. +std::string qos_to_keyexpr(rmw_qos_profile_t qos) +{ + std::string keyexpr = ""; + keyexpr += qos.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE ? "reliable" : "best_effort"; + keyexpr += QOS_DELIMITER; + keyexpr += qos.durability == + RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL ? "transient_local" : "volatile"; + keyexpr += QOS_DELIMITER; + // TODO(Yadunund): Update once we properly support History. + keyexpr += "keep_last,"; + keyexpr += std::to_string(qos.depth); + return keyexpr; +} + +/// Convert a rmw_qos_profile_t from a keyexpr. Return std::nullopt if invalid. +std::optional keyexpr_to_qos(const std::string & keyexpr) +{ + rmw_qos_profile_t qos; + const std::vector parts = split_keyexpr(keyexpr, QOS_DELIMITER); + if (parts.size() < 3) { + return std::nullopt; + } + qos.reliability = parts[0] == + "reliable" ? RMW_QOS_POLICY_RELIABILITY_RELIABLE : RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT; + qos.durability = parts[1] == + "transient_local" ? RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL : + RMW_QOS_POLICY_DURABILITY_VOLATILE; + const std::vector history_parts = split_keyexpr(parts[2], ','); + if (history_parts.size() < 2) { + return std::nullopt; + } + qos.history = RMW_QOS_POLICY_HISTORY_KEEP_LAST; + sscanf(history_parts[1].c_str(), "%zu", &qos.depth); + + return qos; +} + } // namespace ///============================================================================= @@ -156,7 +241,8 @@ Entity::Entity( const auto & topic_info = this->topic_info_.value(); // Note: We don't append a leading "/" as we expect the ROS topic name to start with a "/". token_ss << - "/" + mangle_name(topic_info.name_) + "/" + topic_info.type_ + "/" + topic_info.qos_; + "/" + mangle_name(topic_info.name_) + "/" + topic_info.type_ + "/" + qos_to_keyexpr( + topic_info.qos_); } this->keyexpr_ = token_ss.str(); @@ -186,41 +272,6 @@ std::optional Entity::make( return entity; } -namespace -{ -///============================================================================= -std::vector split_keyexpr( - const std::string & keyexpr, - const char delim = '/') -{ - std::vector delim_idx = {}; - // Insert -1 for starting position to make the split easier when using substr. - delim_idx.push_back(-1); - std::size_t idx = 0; - for (std::string::const_iterator it = keyexpr.begin(); it != keyexpr.end(); ++it) { - if (*it == delim) { - delim_idx.push_back(idx); - } - ++idx; - } - std::vector result = {}; - try { - for (std::size_t i = 1; i < delim_idx.size(); ++i) { - const size_t prev_idx = delim_idx[i - 1]; - const size_t idx = delim_idx[i]; - result.push_back(keyexpr.substr(prev_idx + 1, idx - prev_idx - 1)); - } - } catch (const std::exception & e) { - printf("%s\n", e.what()); - return {}; - } - // Finally add the last substr. - result.push_back(keyexpr.substr(delim_idx.back() + 1)); - return result; -} - -} // namespace - ///============================================================================= std::optional Entity::make(const std::string & keyexpr) { @@ -276,10 +327,18 @@ std::optional Entity::make(const std::string & keyexpr) "Received liveliness token for non-node entity without required parameters."); return std::nullopt; } + std::optional qos = keyexpr_to_qos(parts[8]); + if (!qos.has_value()) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Received liveliness token with invalid qos keyexpr"); + return std::nullopt; + } topic_info = TopicInfo{ demangle_name(std::move(parts[6])), std::move(parts[7]), - std::move(parts[8])}; + std::move(qos.value()) + }; } return Entity{ diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp index c8bd12b4..ad5e3bc2 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp @@ -21,6 +21,8 @@ #include #include +#include "rmw/types.h" + namespace liveliness { @@ -45,12 +47,12 @@ struct TopicInfo { std::string name_; std::string type_; - std::string qos_; + rmw_qos_profile_t qos_; TopicInfo( std::string name, std::string type, - std::string qos); + rmw_qos_profile_t qos); }; ///============================================================================= diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index b37f3445..c1c03041 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -456,12 +456,12 @@ rmw_create_publisher( } } // Adapt any 'best available' QoS options - // rmw_qos_profile_t adapted_qos_profile = *qos_profile; - // rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_publisher( - // node, topic_name, &adapted_qos_profile, rmw_get_subscriptions_info_by_topic); - // if (RMW_RET_OK != ret) { - // return nullptr; - // } + rmw_qos_profile_t adapted_qos_profile = *qos_profile; + rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_publisher( + node, topic_name, &adapted_qos_profile, rmw_get_subscriptions_info_by_topic); + if (RMW_RET_OK != ret) { + return nullptr; + } RMW_CHECK_ARGUMENT_FOR_NULL(publisher_options, nullptr); if (publisher_options->require_unique_network_flow_endpoints == RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_STRICTLY_REQUIRED) @@ -621,7 +621,7 @@ rmw_create_publisher( liveliness::EntityType::Publisher, liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, liveliness::TopicInfo{rmw_publisher->topic_name, - publisher_data->type_support->get_name(), "reliable"} + publisher_data->type_support->get_name(), adapted_qos_profile} ); if (!liveliness_entity.has_value()) { RCUTILS_LOG_ERROR_NAMED( @@ -1335,7 +1335,7 @@ rmw_create_subscription( liveliness::EntityType::Subscription, liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, liveliness::TopicInfo{rmw_subscription->topic_name, - sub_data->type_support->get_name(), "reliable"} + sub_data->type_support->get_name(), adapted_qos_profile} ); if (!liveliness_entity.has_value()) { RCUTILS_LOG_ERROR_NAMED( @@ -1905,7 +1905,7 @@ rmw_create_client( liveliness::EntityType::Client, liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, liveliness::TopicInfo{rmw_client->service_name, - std::move(service_type), "reliable"} + std::move(service_type), *qos_profile} ); if (!liveliness_entity.has_value()) { RCUTILS_LOG_ERROR_NAMED( @@ -2493,7 +2493,7 @@ rmw_create_service( liveliness::EntityType::Service, liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, liveliness::TopicInfo{rmw_service->service_name, - std::move(service_type), "reliable"} + std::move(service_type), *qos_profiles} ); if (!liveliness_entity.has_value()) { RCUTILS_LOG_ERROR_NAMED(