From c32ae92e5092208e62fbe9d2844b439f7d8ee0b0 Mon Sep 17 00:00:00 2001 From: Yadu Date: Wed, 27 Dec 2023 23:28:52 +0800 Subject: [PATCH] Implement method to get endpoints info (#85) Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 108 ++++++++++++++++++ rmw_zenoh_cpp/src/detail/graph_cache.hpp | 8 ++ .../src/rmw_get_topic_endpoint_info.cpp | 45 ++++++-- 3 files changed, 149 insertions(+), 12 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index bcd48a76..b90b40ae 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -683,3 +683,111 @@ rmw_ret_t GraphCache::get_entity_names_and_types_by_node( return RMW_RET_OK; } + + +///============================================================================= +rmw_ret_t GraphCache::get_entities_info_by_topic( + liveliness::EntityType entity_type, + rcutils_allocator_t * allocator, + const char * topic_name, + bool no_demangle, + rmw_topic_endpoint_info_array_t * endpoints_info) const +{ + static_cast(no_demangle); + RMW_CHECK_ARGUMENT_FOR_NULL(topic_name, RMW_RET_INVALID_ARGUMENT); + RCUTILS_CHECK_ALLOCATOR_WITH_MSG( + allocator, "allocator argument is invalid", return RMW_RET_INVALID_ARGUMENT); + + if (entity_type != EntityType::Publisher && entity_type != EntityType::Subscription) { + return RMW_RET_INVALID_ARGUMENT; + } + + std::lock_guard lock(graph_mutex_); + + // Minor optimization to exit early if the topic does not exist in the graph. + if (graph_topics_.find(topic_name) == graph_topics_.end()) { + return RMW_RET_OK; + } + // TODO(Yadunund): Refactor graph_topics_ to map to a list of GraphNodePtr to + // avoid this expensive iteration. + std::size_t size = 0; + std::vector nodes = {}; + for (NamespaceMap::const_iterator ns_it = graph_.begin(); ns_it != graph_.end(); ++ns_it) { + for (NodeMap::const_iterator node_it = ns_it->second.begin(); node_it != ns_it->second.end(); + ++node_it) + { + const GraphNode::TopicMap & entity_map = + entity_type == EntityType::Publisher ? node_it->second->pubs_ : + node_it->second->subs_; + GraphNode::TopicMap::const_iterator topic_it = entity_map.find(topic_name); + if (topic_it != entity_map.end()) { + nodes.push_back(node_it->second); + size += topic_it->second.size(); + } + } + } + + + rmw_ret_t ret = rmw_topic_endpoint_info_array_init_with_size( + endpoints_info, + nodes.size(), + allocator); + if (RMW_RET_OK != ret) { + return ret; + } + + auto cleanup_endpoints_info = rcpputils::make_scope_exit( + [endpoints_info, allocator] { + rmw_ret_t fail_ret = rmw_topic_endpoint_info_array_fini( + endpoints_info, allocator); + if (fail_ret != RMW_RET_OK) { + RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup endpoints info during error handling"); + } + }); + + for (std::size_t i = 0; i < nodes.size(); ++i) { + const GraphNode::TopicMap & entity_map = + entity_type == EntityType::Publisher ? nodes[i]->pubs_ : + nodes[i]->subs_; + const GraphNode::TopicDataMap & topic_data_map = entity_map.find(topic_name)->second; + for (const auto & [topic_data, _] : topic_data_map) { + rmw_topic_endpoint_info_t & endpoint_info = endpoints_info->info_array[i]; + endpoint_info = rmw_get_zero_initialized_topic_endpoint_info(); + + ret = rmw_topic_endpoint_info_set_node_name( + &endpoint_info, + nodes[i]->name_.c_str(), + allocator); + if (RMW_RET_OK != ret) { + return ret; + } + + ret = rmw_topic_endpoint_info_set_node_namespace( + &endpoint_info, + nodes[i]->ns_.c_str(), + allocator); + if (RMW_RET_OK != ret) { + return ret; + } + + ret = rmw_topic_endpoint_info_set_topic_type( + &endpoint_info, + _demangle_if_ros_type(topic_data).c_str(), + allocator); + if (RMW_RET_OK != ret) { + return ret; + } + + ret = rmw_topic_endpoint_info_set_endpoint_type( + &endpoint_info, + entity_type == EntityType::Publisher ? RMW_ENDPOINT_PUBLISHER : RMW_ENDPOINT_SUBSCRIPTION); + if (RMW_RET_OK != ret) { + return ret; + } + // TODO(Yadunund): Set type_hash, qos_profile, gid. + } + } + + cleanup_endpoints_info.cancel(); + return RMW_RET_OK; +} diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index 3d0005f2..c6143d8b 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -29,6 +29,7 @@ #include "rcutils/types.h" #include "rmw/rmw.h" +#include "rmw/get_topic_endpoint_info.h" #include "rmw/names_and_types.h" @@ -108,6 +109,13 @@ class GraphCache final bool no_demangle, rmw_names_and_types_t * names_and_types) const; + rmw_ret_t get_entities_info_by_topic( + liveliness::EntityType entity_type, + rcutils_allocator_t * allocator, + const char * topic_name, + bool no_demangle, + rmw_topic_endpoint_info_array_t * endpoints_info) const; + private: /* namespace_1: diff --git a/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp b/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp index bf435678..12795f41 100644 --- a/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp @@ -13,7 +13,12 @@ // limitations under the License. +#include "detail/identifier.hpp" +#include "detail/liveliness_utils.hpp" +#include "detail/rmw_data_types.hpp" + #include "rmw/get_topic_endpoint_info.h" +#include "rmw/impl/cpp/macros.hpp" extern "C" { @@ -27,12 +32,20 @@ rmw_get_publishers_info_by_topic( bool no_mangle, rmw_topic_endpoint_info_array_t * publishers_info) { - static_cast(node); - static_cast(allocator); - static_cast(topic_name); - static_cast(no_mangle); - static_cast(publishers_info); - return RMW_RET_OK; + RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + node, + node->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); + return node->context->impl->graph_cache.get_entities_info_by_topic( + liveliness::EntityType::Publisher, + allocator, + topic_name, + no_mangle, + publishers_info); } ///============================================================================== @@ -45,11 +58,19 @@ rmw_get_subscriptions_info_by_topic( bool no_mangle, rmw_topic_endpoint_info_array_t * subscriptions_info) { - static_cast(node); - static_cast(allocator); - static_cast(topic_name); - static_cast(no_mangle); - static_cast(subscriptions_info); - return RMW_RET_OK; + RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + node, + node->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); + return node->context->impl->graph_cache.get_entities_info_by_topic( + liveliness::EntityType::Subscription, + allocator, + topic_name, + no_mangle, + subscriptions_info); } } // extern "C"