diff --git a/rmw_zenoh_cpp/src/detail/event.hpp b/rmw_zenoh_cpp/src/detail/event.hpp index b539e5e1..73422269 100644 --- a/rmw_zenoh_cpp/src/detail/event.hpp +++ b/rmw_zenoh_cpp/src/detail/event.hpp @@ -20,7 +20,7 @@ #include "rmw/event.h" ///============================================================================= -// RMW Event types that we support in rmw_zenoh. +// A struct that represents an event status in rmw_zenoh. enum rmw_zenoh_event_type_t { // sentinel value @@ -43,6 +43,7 @@ enum rmw_zenoh_event_type_t /// Helper value to indicate the maximum index of events supported. #define ZENOH_EVENT_ID_MAX rmw_zenoh_event_type_t::ZENOH_EVENT_PUBLICATION_MATCHED +// RMW Event types that we support in rmw_zenoh. static const std::unordered_map event_map{ {RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE, ZENOH_EVENT_REQUESTED_QOS_INCOMPATIBLE}, {RMW_EVENT_OFFERED_QOS_INCOMPATIBLE, ZENOH_EVENT_OFFERED_QOS_INCOMPATIBLE}, diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 7d322c20..b35c1309 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -71,10 +71,8 @@ void GraphCache::parse_put(const std::string & keyexpr) auto add_topic_data = [](const Entity & entity, GraphNode & graph_node, GraphCache & graph_cache) -> void { - if (entity.type() != EntityType::Publisher && - entity.type() != EntityType::Subscription && - entity.type() != EntityType::Service && - entity.type() != EntityType::Client) + if (entity.type() == EntityType::Invalid || + entity.type() == EntityType::Node) { RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", @@ -92,86 +90,80 @@ void GraphCache::parse_put(const std::string & keyexpr) } const liveliness::TopicInfo topic_info = entity.topic_info().value(); - std::string entity_desc = ""; - GraphNode::TopicMap & topic_map = - [](const Entity & entity, GraphNode & graph_node, - std::string & entity_desc) -> GraphNode::TopicMap & - { - if (entity.type() == EntityType::Publisher) { - entity_desc = "publisher"; - return graph_node.pubs_; - } else if (entity.type() == EntityType::Subscription) { - entity_desc = "subscription"; - return graph_node.subs_; - } else if (entity.type() == EntityType::Service) { - entity_desc = "service"; - return graph_node.services_; - - } else { - entity_desc = "client"; - return graph_node.clients_; - } - }(entity, graph_node, entity_desc); // For the sake of reusing data structures and lookup functions, we treat publishers and // clients are equivalent. Similarly, subscriptions and services are equivalent. const std::size_t pub_count = entity.type() == EntityType::Publisher || entity.type() == EntityType::Client ? 1 : 0; const std::size_t sub_count = !pub_count; - TopicDataPtr graph_topic_data = std::make_shared( - topic_info, - TopicStats{pub_count, sub_count}); - - GraphNode::TopicDataMap topic_data_map = { - {graph_topic_data->info_.type_, graph_topic_data}}; - std::pair insertion = - topic_map.insert(std::make_pair(topic_info.name_, topic_data_map)); - if (!insertion.second) { - // A topic with the same name already exists in the node so we append the type. - std::pair type_insertion = - insertion.first->second.insert( - std::make_pair( - graph_topic_data->info_.type_, - graph_topic_data)); - if (!type_insertion.second) { - // We have another instance of a pub/sub over the same topic and type so we increment - // the counters. - TopicDataPtr & existing_graph_topic = type_insertion.first->second; - existing_graph_topic->stats_.pub_count_ += pub_count; - existing_graph_topic->stats_.sub_count_ += sub_count; - } + // Helper lambda to update TopicMap within the node the one for the entire graph. + auto update_topic_map = + [](GraphNode::TopicMap & topic_map, + const liveliness::TopicInfo topic_info, + const std::size_t pub_count, + const std::size_t sub_count) -> void + { + TopicDataPtr graph_topic_data = std::make_shared( + topic_info, + TopicStats{pub_count, sub_count}); + std::string qos_str = liveliness::qos_to_keyexpr(topic_info.qos_); + GraphNode::TopicQoSMap topic_qos_map = { + {qos_str, graph_topic_data}}; + + GraphNode::TopicMap::iterator topic_map_it = topic_map.find(topic_info.name_); + if (topic_map_it == topic_map.end()) { + // First time this topic name is discovered for the node so we insert a TopicTypeMap. + GraphNode::TopicTypeMap topic_data_map = { + {topic_info.type_, std::move(topic_qos_map)} + }; + topic_map.insert(std::make_pair(topic_info.name_, std::move(topic_data_map))); + } else { + // The topic exists for the node so we check if the type also exists. + GraphNode::TopicTypeMap::iterator topic_data_map_it = topic_map_it->second.find( + topic_info.type_); + if (topic_data_map_it == topic_map_it->second.end()) { + // First time this topic type is added. + topic_map_it->second.insert( + std::make_pair( + topic_info.type_, + std::move(topic_qos_map))); + } else { + // The topic type already exists so we check if qos also exists. + GraphNode::TopicQoSMap::iterator topic_qos_map_it = topic_data_map_it->second.find( + qos_str); + if (topic_qos_map_it == topic_data_map_it->second.end()) { + // First time this qos is added. + topic_data_map_it->second.insert(std::make_pair(qos_str, graph_topic_data)); + } else { + // We have another instance of a pub/sub over the same topic, + // type and qos so we increment the counters. + TopicDataPtr & existing_graph_topic = topic_qos_map_it->second; + existing_graph_topic->stats_.pub_count_ += pub_count; + existing_graph_topic->stats_.sub_count_ += sub_count; + } + } + } + }; + + // First update the topic map within the node. + if (entity.type() == EntityType::Publisher) { + update_topic_map(graph_node.pubs_, topic_info, pub_count, sub_count); + } else if (entity.type() == EntityType::Subscription) { + update_topic_map(graph_node.subs_, topic_info, pub_count, sub_count); + } else if (entity.type() == EntityType::Service) { + update_topic_map(graph_node.services_, topic_info, pub_count, sub_count); + } else { + update_topic_map(graph_node.clients_, topic_info, pub_count, sub_count); } - // Bookkeeping: Update graph_topics_ which keeps track of topics across all nodes in the graph - GraphNode::TopicMap & graph_endpoints = - entity.type() == EntityType::Publisher || entity.type() == EntityType::Subscription ? - graph_cache.graph_topics_ : - graph_cache.graph_services_; - GraphNode::TopicMap::iterator cache_topic_it = graph_endpoints.find(topic_info.name_); - if (cache_topic_it == graph_endpoints.end()) { - // First time this topic name is added to the graph. - std::shared_ptr topic_data_ptr = std::make_shared( - topic_info, - TopicStats{pub_count, sub_count} - ); - graph_endpoints[topic_info.name_] = GraphNode::TopicDataMap{ - {topic_info.type_, topic_data_ptr} - }; + // Then update the variables tracking topics across the graph. + // TODO(Yadunund): Check for QoS events. + if (entity.type() == EntityType::Publisher || + entity.type() == EntityType::Subscription) + { + update_topic_map(graph_cache.graph_topics_, topic_info, pub_count, sub_count); } else { - // If a TopicData entry for the same type exists in the topic map, update pub/sub counts - // or else create an new TopicData. - std::pair topic_data_insertion = - cache_topic_it->second.insert(std::make_pair(topic_info.type_, nullptr)); - if (topic_data_insertion.second) { - // A TopicData for the topic_type does not exist. - topic_data_insertion.first->second = std::make_shared( - topic_info, - TopicStats{pub_count, sub_count}); - } else { - // Update the existing counters. - topic_data_insertion.first->second->stats_.pub_count_ += pub_count; - topic_data_insertion.first->second->stats_.sub_count_ += sub_count; - } + update_topic_map(graph_cache.graph_services_, topic_info, pub_count, sub_count); } }; @@ -271,14 +263,12 @@ void GraphCache::parse_del(const std::string & keyexpr) const liveliness::Entity & entity = *valid_entity; // Helper lambda to update graph_topics_. - auto update_graph_topics = - [](const liveliness::TopicInfo topic_info, const EntityType entity_type, std::size_t pub_count, - std::size_t sub_count, GraphCache & graph_cache) -> void + auto update_topic_map = + [](GraphNode::TopicMap & graph_endpoints, + const liveliness::TopicInfo & topic_info, + std::size_t pub_count, + std::size_t sub_count) -> void { - GraphNode::TopicMap & graph_endpoints = - entity_type == EntityType::Publisher || entity_type == EntityType::Subscription ? - graph_cache.graph_topics_ : - graph_cache.graph_services_; GraphNode::TopicMap::iterator cache_topic_it = graph_endpoints.find(topic_info.name_); if (cache_topic_it == graph_endpoints.end()) { @@ -287,16 +277,25 @@ void GraphCache::parse_del(const std::string & keyexpr) "rmw_zenoh_cpp", "topic_key %s not found in graph_endpoints. Report this.", topic_info.name_.c_str()); } else { - GraphNode::TopicDataMap::iterator cache_topic_data_it = + GraphNode::TopicTypeMap::iterator cache_topic_data_it = cache_topic_it->second.find(topic_info.type_); if (cache_topic_data_it != cache_topic_it->second.end()) { - // Decrement the relevant counters. If both counters are 0 remove from cache. - cache_topic_data_it->second->stats_.pub_count_ -= pub_count; - cache_topic_data_it->second->stats_.sub_count_ -= sub_count; - if (cache_topic_data_it->second->stats_.pub_count_ == 0 && - cache_topic_data_it->second->stats_.sub_count_ == 0) - { - cache_topic_it->second.erase(cache_topic_data_it); + const std::string qos_str = liveliness::qos_to_keyexpr(topic_info.qos_); + GraphNode::TopicQoSMap::iterator cache_topic_qos_it = cache_topic_data_it->second.find( + qos_str); + if (cache_topic_qos_it != cache_topic_data_it->second.end()) { + // Decrement the relevant counters. If both counters are 0 remove from cache. + cache_topic_qos_it->second->stats_.pub_count_ -= pub_count; + cache_topic_qos_it->second->stats_.sub_count_ -= sub_count; + if (cache_topic_qos_it->second->stats_.pub_count_ == 0 && + cache_topic_qos_it->second->stats_.sub_count_ == 0) + { + cache_topic_data_it->second.erase(qos_str); + } + // If the qos map is empty, erase it from the topic_data_map. + if (cache_topic_data_it->second.empty()) { + cache_topic_it->second.erase(cache_topic_data_it); + } } // If the topic does not have any TopicData entries, erase the topic from the map. if (cache_topic_it->second.empty()) { @@ -306,16 +305,14 @@ void GraphCache::parse_del(const std::string & keyexpr) } }; - // Helper lambda to append pub/subs to the GraphNode. + // Helper lambda to remove pub/subs to the GraphNode. // We capture by reference to update caches like graph_topics_ if update_cache is true. auto remove_topic_data = - [&update_graph_topics](const Entity & entity, GraphNode & graph_node, + [&update_topic_map](const Entity & entity, GraphNode & graph_node, GraphCache & graph_cache) -> void { - if (entity.type() != EntityType::Publisher && - entity.type() != EntityType::Subscription && - entity.type() != EntityType::Service && - entity.type() != EntityType::Client) + if (entity.type() == EntityType::Invalid || + entity.type() == EntityType::Node) { RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", @@ -332,66 +329,32 @@ void GraphCache::parse_del(const std::string & keyexpr) return; } const liveliness::TopicInfo topic_info = entity.topic_info().value(); - std::string entity_desc = ""; - GraphNode::TopicMap & topic_map = - [](const Entity & entity, GraphNode & graph_node, - std::string & entity_desc) -> GraphNode::TopicMap & - { - if (entity.type() == EntityType::Publisher) { - entity_desc = "publisher"; - return graph_node.pubs_; - } else if (entity.type() == EntityType::Subscription) { - entity_desc = "subscription"; - return graph_node.subs_; - } else if (entity.type() == EntityType::Service) { - entity_desc = "service"; - return graph_node.services_; - - } else { - entity_desc = "client"; - return graph_node.clients_; - } - }(entity, graph_node, entity_desc); // For the sake of reusing data structures and lookup functions, we treat publishers and // clients are equivalent. Similarly, subscriptions and services are equivalent. const std::size_t pub_count = entity.type() == EntityType::Publisher || entity.type() == EntityType::Client ? 1 : 0; const std::size_t sub_count = !pub_count; - GraphNode::TopicMap::iterator topic_it = topic_map.find(topic_info.name_); - if (topic_it == topic_map.end()) { - // Pub/sub not found. - return; - } - - GraphNode::TopicDataMap & topic_data_map = topic_it->second; - // Search the unordered_set for the TopicData for this topic. - GraphNode::TopicDataMap::iterator topic_data_it = - topic_data_map.find(topic_info.type_); - if (topic_data_it == topic_data_map.end()) { - // Something is wrong. - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", "TopicData not found for topic %s. Report this.", - topic_info.name_.c_str()); - return; + // First update the topic map within the node. + if (entity.type() == EntityType::Publisher) { + update_topic_map(graph_node.pubs_, topic_info, pub_count, sub_count); + } else if (entity.type() == EntityType::Subscription) { + update_topic_map(graph_node.subs_, topic_info, pub_count, sub_count); + } else if (entity.type() == EntityType::Service) { + update_topic_map(graph_node.services_, topic_info, pub_count, sub_count); + } else { + update_topic_map(graph_node.clients_, topic_info, pub_count, sub_count); } - // Decrement the relevant counters. If both counters are 0 remove from graph_node. - TopicDataPtr & existing_topic_data = topic_data_it->second; - existing_topic_data->stats_.pub_count_ -= pub_count; - existing_topic_data->stats_.sub_count_ -= sub_count; - if (existing_topic_data->stats_.pub_count_ == 0 && - existing_topic_data->stats_.sub_count_ == 0) + // Then update the variables tracking topics across the graph. + // TODO(Yadunund): Check for QoS events. + if (entity.type() == EntityType::Publisher || + entity.type() == EntityType::Subscription) { - topic_data_map.erase(topic_data_it); - } - // If the topic does not have any TopicData entries, erase the topic from the map. - if (topic_data_map.empty()) { - topic_map.erase(topic_info.name_); + update_topic_map(graph_cache.graph_topics_, topic_info, pub_count, sub_count); + } else { + update_topic_map(graph_cache.graph_services_, topic_info, pub_count, sub_count); } - - // Bookkeeping: Update graph_topic_ which keeps track of topics across all nodes in the graph. - update_graph_topics(topic_info, entity.type(), pub_count, sub_count, graph_cache); }; // Lock the graph mutex before accessing the graph. @@ -436,26 +399,34 @@ void GraphCache::parse_del(const std::string & keyexpr) "node have been removed. Removing all pub/subs first...", entity.node_name().c_str() ); - auto remove_topics = - [&update_graph_topics](const GraphNode::TopicMap & topic_map, - const EntityType & entity_type, GraphCache & graph_cache) -> void { - std::size_t pub_count = entity_type == EntityType::Publisher || - entity_type == EntityType::Client ? 1 : 0; - std::size_t sub_count = !pub_count; - for (auto topic_it = topic_map.begin(); topic_it != topic_map.end(); ++topic_it) { - for (auto type_it = topic_it->second.begin(); type_it != topic_it->second.end(); - ++type_it) + // We update the tracking variables to reduce the count of entities present in this node. + auto remove_topic_map_from_cache = + [&update_topic_map](const GraphNode::TopicMap & to_remove, + GraphNode::TopicMap & from_cache) -> void + { + for (GraphNode::TopicMap::const_iterator topic_it = to_remove.begin(); + topic_it != to_remove.end(); ++topic_it) + { + for (GraphNode::TopicTypeMap::const_iterator topic_type_it = topic_it->second.begin(); + topic_type_it != topic_it->second.end(); ++topic_type_it) { - update_graph_topics( - type_it->second->info_, entity_type, pub_count, sub_count, - graph_cache); + for (GraphNode::TopicQoSMap::const_iterator topic_qos_it = + topic_type_it->second.begin(); + topic_qos_it != topic_type_it->second.end(); ++topic_qos_it) + { + update_topic_map( + from_cache, + topic_qos_it->second->info_, + topic_qos_it->second->stats_.pub_count_, + topic_qos_it->second->stats_.sub_count_); + } } } }; - remove_topics(graph_node->pubs_, EntityType::Publisher, *this); - remove_topics(graph_node->subs_, EntityType::Subscription, *this); - remove_topics(graph_node->services_, EntityType::Service, *this); - remove_topics(graph_node->clients_, EntityType::Client, *this); + remove_topic_map_from_cache(graph_node->pubs_, graph_topics_); + remove_topic_map_from_cache(graph_node->subs_, graph_topics_); + remove_topic_map_from_cache(graph_node->services_, graph_services_); + remove_topic_map_from_cache(graph_node->clients_, graph_services_); } ns_it->second.erase(node_it); total_nodes_in_graph_ -= 1; @@ -632,7 +603,7 @@ rmw_ret_t fill_names_and_types( }); // Fill topic names and types. std::size_t index = 0; - for (const std::pair & item : entity_map) { + for (const std::pair & item : entity_map) { names_and_types->names.data[index] = rcutils_strdup(item.first.c_str(), *allocator); if (!names_and_types->names.data[index]) { return RMW_RET_BAD_ALLOC; @@ -648,7 +619,7 @@ rmw_ret_t fill_names_and_types( } } size_t type_index = 0; - for (const std::pair & type : item.second) { + for (const std::pair & type : item.second) { char * type_name = rcutils_strdup(_demangle_if_ros_type(type.first).c_str(), *allocator); if (!type_name) { RMW_SET_ERROR_MSG("failed to allocate memory for type name"); @@ -686,15 +657,30 @@ rmw_ret_t GraphCache::publisher_count_matched_subscriptions( const rmw_publisher_t * publisher, size_t * subscription_count) { - // TODO(Yadunund): Check if QoS settings also match. + // TODO(Yadunund): Replace this logic by returning a number that is tracked once + // we support matched qos events. *subscription_count = 0; GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(publisher->topic_name); if (topic_it != graph_topics_.end()) { rmw_publisher_data_t * pub_data = static_cast(publisher->data); - GraphNode::TopicDataMap::const_iterator topic_data_it = topic_it->second.find( + GraphNode::TopicTypeMap::const_iterator topic_data_it = topic_it->second.find( pub_data->type_support->get_name()); if (topic_data_it != topic_it->second.end()) { - *subscription_count = topic_data_it->second->stats_.sub_count_; + for (const auto & [_, topic_data] : topic_data_it->second) { + // If a subscription exists with compatible QoS, update the subscription count. + if (topic_data->stats_.sub_count_ > 0) { + rmw_qos_compatibility_type_t is_compatible; + rmw_ret_t ret = rmw_qos_profile_check_compatible( + pub_data->adapted_qos_profile, + topic_data->info_.qos_, + &is_compatible, + nullptr, + 0); + if (ret == RMW_RET_OK && is_compatible == RMW_QOS_COMPATIBILITY_OK) { + *subscription_count = *subscription_count + topic_data->stats_.sub_count_; + } + } + } } } @@ -706,15 +692,30 @@ rmw_ret_t GraphCache::subscription_count_matched_publishers( const rmw_subscription_t * subscription, size_t * publisher_count) { - // TODO(Yadunund): Check if QoS settings also match. + // TODO(Yadunund): Replace this logic by returning a number that is tracked once + // we support matched qos events. *publisher_count = 0; GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(subscription->topic_name); if (topic_it != graph_topics_.end()) { rmw_subscription_data_t * sub_data = static_cast(subscription->data); - GraphNode::TopicDataMap::const_iterator topic_data_it = topic_it->second.find( + GraphNode::TopicTypeMap::const_iterator topic_data_it = topic_it->second.find( sub_data->type_support->get_name()); if (topic_data_it != topic_it->second.end()) { - *publisher_count = topic_data_it->second->stats_.pub_count_; + for (const auto & [_, topic_data] : topic_data_it->second) { + // If a subscription exists with compatible QoS, update the subscription count. + if (topic_data->stats_.pub_count_ > 0) { + rmw_qos_compatibility_type_t is_compatible; + rmw_ret_t ret = rmw_qos_profile_check_compatible( + sub_data->adapted_qos_profile, + topic_data->info_.qos_, + &is_compatible, + nullptr, + 0); + if (ret == RMW_RET_OK && is_compatible == RMW_QOS_COMPATIBILITY_OK) { + *publisher_count = *publisher_count + topic_data->stats_.pub_count_; + } + } + } } } @@ -741,9 +742,13 @@ rmw_ret_t GraphCache::count_publishers( *count = 0; std::lock_guard lock(graph_mutex_); if (graph_topics_.count(topic_name) != 0) { - for (const std::pair & it : graph_topics_.at(topic_name)) { - // Iterate through all the types and increment count. - *count += it.second->stats_.pub_count_; + // Iterate through all the types and increment count. + for (const std::pair & topic_data : graph_topics_.at(topic_name)) + { + for (auto it = topic_data.second.begin(); it != topic_data.second.end(); ++it) { + *count += it->second->stats_.pub_count_; + } } } @@ -758,9 +763,13 @@ rmw_ret_t GraphCache::count_subscriptions( *count = 0; std::lock_guard lock(graph_mutex_); if (graph_topics_.count(topic_name) != 0) { - for (const std::pair & it : graph_topics_.at(topic_name)) { - // Iterate through all the types and increment count. - *count += it.second->stats_.sub_count_; + // Iterate through all the types and increment count. + for (const std::pair & topic_data : graph_topics_.at(topic_name)) + { + for (auto it = topic_data.second.begin(); it != topic_data.second.end(); ++it) { + *count += it->second->stats_.sub_count_; + } } } @@ -775,9 +784,13 @@ rmw_ret_t GraphCache::count_services( *count = 0; std::lock_guard lock(graph_mutex_); if (graph_services_.count(service_name) != 0) { - for (const std::pair & it : graph_services_.at(service_name)) { - // Iterate through all the types and increment count. - *count += it.second->stats_.sub_count_; + // Iterate through all the types and increment count. + for (const std::pair & topic_data : graph_services_.at(service_name)) + { + for (auto it = topic_data.second.begin(); it != topic_data.second.end(); ++it) { + *count += it->second->stats_.sub_count_; + } } } @@ -792,9 +805,13 @@ rmw_ret_t GraphCache::count_clients( *count = 0; std::lock_guard lock(graph_mutex_); if (graph_services_.count(service_name) != 0) { - for (const std::pair & it : graph_services_.at(service_name)) { - // Iterate through all the types and increment count. - *count += it.second->stats_.pub_count_; + // Iterate through all the types and increment count. + for (const std::pair & topic_data : graph_services_.at(service_name)) + { + for (auto it = topic_data.second.begin(); it != topic_data.second.end(); ++it) { + *count += it->second->stats_.pub_count_; + } } } @@ -934,51 +951,54 @@ rmw_ret_t GraphCache::get_entities_info_by_topic( const GraphNode::TopicMap & entity_map = entity_type == EntityType::Publisher ? nodes[i]->pubs_ : nodes[i]->subs_; - const GraphNode::TopicDataMap & topic_data_map = entity_map.find(topic_name)->second; - for (const auto & [topic_type, topic_data] : topic_data_map) { - rmw_topic_endpoint_info_t & endpoint_info = endpoints_info->info_array[i]; - endpoint_info = rmw_get_zero_initialized_topic_endpoint_info(); - - ret = rmw_topic_endpoint_info_set_node_name( - &endpoint_info, - nodes[i]->name_.c_str(), - allocator); - if (RMW_RET_OK != ret) { - return ret; - } + const GraphNode::TopicTypeMap & topic_data_map = entity_map.find(topic_name)->second; + for (const auto & [topic_type, topic_qos_map] : topic_data_map) { + for (const auto & [_, topic_data] : topic_qos_map) { + rmw_topic_endpoint_info_t & endpoint_info = endpoints_info->info_array[i]; + endpoint_info = rmw_get_zero_initialized_topic_endpoint_info(); + + ret = rmw_topic_endpoint_info_set_node_name( + &endpoint_info, + nodes[i]->name_.c_str(), + allocator); + if (RMW_RET_OK != ret) { + return ret; + } - ret = rmw_topic_endpoint_info_set_node_namespace( - &endpoint_info, - nodes[i]->ns_.c_str(), - allocator); - if (RMW_RET_OK != ret) { - return ret; - } + ret = rmw_topic_endpoint_info_set_node_namespace( + &endpoint_info, + nodes[i]->ns_.c_str(), + allocator); + if (RMW_RET_OK != ret) { + return ret; + } - ret = rmw_topic_endpoint_info_set_topic_type( - &endpoint_info, - _demangle_if_ros_type(topic_type).c_str(), - allocator); - if (RMW_RET_OK != ret) { - return ret; - } + ret = rmw_topic_endpoint_info_set_topic_type( + &endpoint_info, + _demangle_if_ros_type(topic_type).c_str(), + allocator); + if (RMW_RET_OK != ret) { + return ret; + } - ret = rmw_topic_endpoint_info_set_endpoint_type( - &endpoint_info, - entity_type == EntityType::Publisher ? RMW_ENDPOINT_PUBLISHER : RMW_ENDPOINT_SUBSCRIPTION); - if (RMW_RET_OK != ret) { - return ret; - } + ret = rmw_topic_endpoint_info_set_endpoint_type( + &endpoint_info, + entity_type == + EntityType::Publisher ? RMW_ENDPOINT_PUBLISHER : RMW_ENDPOINT_SUBSCRIPTION); + if (RMW_RET_OK != ret) { + return ret; + } - ret = rmw_topic_endpoint_info_set_qos_profile( - &endpoint_info, - &topic_data->info_.qos_ - ); - if (RMW_RET_OK != ret) { - return ret; - } + ret = rmw_topic_endpoint_info_set_qos_profile( + &endpoint_info, + &topic_data->info_.qos_ + ); + if (RMW_RET_OK != ret) { + return ret; + } - // TODO(Yadunund): Set type_hash, gid. + // TODO(Yadunund): Set type_hash, gid. + } } } @@ -996,9 +1016,14 @@ rmw_ret_t GraphCache::service_server_is_available( std::lock_guard lock(graph_mutex_); GraphNode::TopicMap::iterator service_it = graph_services_.find(service_name); if (service_it != graph_services_.end()) { - GraphNode::TopicDataMap::iterator type_it = service_it->second.find(service_type); + GraphNode::TopicTypeMap::iterator type_it = service_it->second.find(service_type); if (type_it != service_it->second.end()) { - *is_available = true; + for (const auto & [_, topic_data] : type_it->second) { + if (topic_data->stats_.sub_count_ > 0) { + *is_available = true; + return RMW_RET_OK; + } + } } } diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index c207c738..603422ac 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -70,10 +70,12 @@ struct GraphNode // TODO(Yadunund): Should enclave be the parent to the namespace key and not within a Node? std::string enclave_; - // Map topic type to TopicData - using TopicDataMap = std::unordered_map; - // Map topic name to TopicDataMap - using TopicMap = std::unordered_map; + // Map QoS (serialized using liveliness::qos_to_keyexpr) to TopicData + using TopicQoSMap = std::unordered_map; + // Map topic type to QoSMap + using TopicTypeMap = std::unordered_map; + // Map topic name to TopicTypeMap + using TopicMap = std::unordered_map; // Entries for pub/sub. TopicMap pubs_ = {}; diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index 997f76b3..03ffccfe 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -140,20 +140,9 @@ std::vector split_keyexpr( result.push_back(keyexpr.substr(start)); return result; } +} // namespace -/** - * Convert a rmw_qos_profile_t to a string with format: - * - * ::," - * Where: - * - enum value from rmw_qos_reliability_policy_e. - * - enum value from rmw_qos_durability_policy_e. - * - enum value from rmw_qos_history_policy_e. - * - The depth number. - * For example, the liveliness substring for a topic with Reliability policy: reliable, - * Durability policy: volatile, History policy: keep_last, and depth: 10, would be - * "1:2:1,10". See rmw/types.h for the values of each policy enum. - */ +///============================================================================= // TODO(Yadunund): Rely on maps to retrieve strings. std::string qos_to_keyexpr(rmw_qos_profile_t qos) { @@ -168,7 +157,7 @@ std::string qos_to_keyexpr(rmw_qos_profile_t qos) return keyexpr; } -/// Convert a rmw_qos_profile_t from a keyexpr. Return std::nullopt if invalid. +///============================================================================= std::optional keyexpr_to_qos(const std::string & keyexpr) { rmw_qos_profile_t qos; @@ -218,7 +207,6 @@ std::optional keyexpr_to_qos(const std::string & keyexpr) qos.lifespan = RMW_QOS_LIFESPAN_DEFAULT; return qos; } -} // namespace ///============================================================================= std::string subscription_token(size_t domain_id) diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp index 390ecc68..89b606dc 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp @@ -125,6 +125,25 @@ std::string mangle_name(const std::string & input); /// Replace "%" instances with "/". std::string demangle_name(const std::string & input); +/** + * Convert a rmw_qos_profile_t to a string with format: + * + * ::," + * Where: + * - enum value from rmw_qos_reliability_policy_e. + * - enum value from rmw_qos_durability_policy_e. + * - enum value from rmw_qos_history_policy_e. + * - The depth number. + * For example, the liveliness substring for a topic with Reliability policy: reliable, + * Durability policy: volatile, History policy: keep_last, and depth: 10, would be + * "1:2:1,10". See rmw/types.h for the values of each policy enum. + */ +std::string qos_to_keyexpr(rmw_qos_profile_t qos); + +/// Convert a rmw_qos_profile_t from a keyexpr. Return std::nullopt if invalid. +std::optional keyexpr_to_qos(const std::string & keyexpr); + + } // namespace liveliness #endif // DETAIL__LIVELINESS_UTILS_HPP_