From 7c79ff1f17707eda7f9ae1396439c441fefcd7e7 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Mon, 27 Nov 2023 16:55:23 +0000 Subject: [PATCH] Cleanup some of the code in the liveliness. Also fix a couple of bugs where we were mixing up subscriptions and publications, leading to incorrect counts. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 67 +++++++++++------------- rmw_zenoh_cpp/src/detail/graph_cache.hpp | 7 +-- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 8 ++- 3 files changed, 35 insertions(+), 47 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index c80a4421..b4e82dd9 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -46,7 +46,7 @@ TopicStats::TopicStats(std::size_t pub_count, std::size_t sub_count) ///============================================================================= TopicData::TopicData( - TopicInfo info, + liveliness::TopicInfo info, TopicStats stats) : info_(std::move(info)), stats_(std::move(stats)) @@ -63,10 +63,9 @@ void GraphCache::parse_put(const std::string & keyexpr) const auto & entity = *valid_entity; // Helper lambda to append pub/subs to the GraphNode. - // We capture by reference to update caches like graph_topics_ if update_cache is true. + // We capture by reference to update graph_topics_ auto add_topic_data = - [&](const Entity & entity, GraphNode & graph_node, - bool update_cache = false) -> void + [this](const Entity & entity, GraphNode & graph_node) -> void { if (entity.type() != EntityType::Publisher && entity.type() != EntityType::Subscription) { return; @@ -97,37 +96,35 @@ void GraphCache::parse_put(const std::string & keyexpr) // the counters. auto & existing_graph_topic = type_insertion.first->second; existing_graph_topic->stats_.pub_count_ += pub_count; - existing_graph_topic->stats_.pub_count_ += sub_count; + existing_graph_topic->stats_.sub_count_ += sub_count; } } - // Bookkeeping: Update graph_topic_ which keeps track of topics across all nodes in the graph. - if (update_cache) { - auto cache_topic_it = graph_topics_.find(entity.topic_info()->name_); - if (cache_topic_it == graph_topics_.end()) { - // First time this topic name is added to the graph. - auto topic_data_ptr = std::make_shared( + // Bookkeeping: Update graph_topics_ which keeps track of topics across all nodes in the graph + auto cache_topic_it = graph_topics_.find(entity.topic_info()->name_); + if (cache_topic_it == graph_topics_.end()) { + // First time this topic name is added to the graph. + auto topic_data_ptr = std::make_shared( + entity.topic_info().value(), + TopicStats{pub_count, sub_count} + ); + graph_topics_[entity.topic_info()->name_] = GraphNode::TopicDataMap{ + {entity.topic_info()->type_, topic_data_ptr} + }; + } else { + // If a TopicData entry for the same type exists in the topic map, update pub/sub counts + // or else create an new TopicData. + auto topic_data_insertion = + cache_topic_it->second.insert(std::make_pair(entity.topic_info()->type_, nullptr)); + if (topic_data_insertion.second) { + // A TopicData for the topic_type does not exist. + topic_data_insertion.first->second = std::make_shared( entity.topic_info().value(), - TopicStats{pub_count, sub_count} - ); - graph_topics_[entity.topic_info()->name_] = GraphNode::TopicDataMap{ - {entity.topic_info()->type_, topic_data_ptr} - }; + TopicStats{pub_count, sub_count}); } else { - // If a TopicData entry for the same type exists in the topic map, update pub/sub counts - // or else create an new TopicData. - auto topic_data_insertion = - cache_topic_it->second.insert(std::make_pair(entity.topic_info()->type_, nullptr)); - if (topic_data_insertion.second) { - // A TopicData for the topic_type does not exist. - topic_data_insertion.first->second = std::make_shared( - entity.topic_info().value(), - TopicStats{pub_count, sub_count}); - } else { - // Update the existing counters. - topic_data_insertion.first->second->stats_.pub_count_ += pub_count; - topic_data_insertion.first->second->stats_.sub_count_ += sub_count; - } + // Update the existing counters. + topic_data_insertion.first->second->stats_.pub_count_ += pub_count; + topic_data_insertion.first->second->stats_.sub_count_ += sub_count; } } @@ -141,7 +138,7 @@ void GraphCache::parse_put(const std::string & keyexpr) graph_node.name_.c_str()); }; - // Helper lambdas to convert an Entity into a GraphNode. + // Helper lambda to convert an Entity into a GraphNode. auto make_graph_node = [&](const Entity & entity) -> std::shared_ptr { @@ -155,7 +152,7 @@ void GraphCache::parse_put(const std::string & keyexpr) return graph_node; } // Add pub/sub entries. - add_topic_data(entity, *graph_node, true); + add_topic_data(entity, *graph_node); return graph_node; }; @@ -205,7 +202,7 @@ void GraphCache::parse_put(const std::string & keyexpr) } // Update the graph based on the entity. - add_topic_data(entity, *(node_it->second), true); + add_topic_data(entity, *(node_it->second)); } ///============================================================================= @@ -558,11 +555,9 @@ rmw_ret_t GraphCache::get_topic_names_and_types( ///============================================================================= rmw_ret_t GraphCache::count_publishers( - const rmw_node_t * node, const char * topic_name, size_t * count) const { - static_cast(node); *count = 0; auto topic_it = graph_topics_.find(topic_name); if (topic_it == graph_topics_.end()) { @@ -578,11 +573,9 @@ rmw_ret_t GraphCache::count_publishers( ///============================================================================= rmw_ret_t GraphCache::count_subscriptions( - const rmw_node_t * node, const char * topic_name, size_t * count) const { - static_cast(node); *count = 0; auto topic_it = graph_topics_.find(topic_name); if (topic_it == graph_topics_.end()) { diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index d9010711..bec68400 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -41,16 +41,15 @@ struct TopicStats // Constructor which initializes counters to 0. TopicStats(std::size_t pub_count, std::size_t sub_count); }; -using TopicInfo = liveliness::TopicInfo; ///============================================================================= struct TopicData { - TopicInfo info_; + liveliness::TopicInfo info_; TopicStats stats_; TopicData( - TopicInfo info, + liveliness::TopicInfo info, TopicStats stats); }; using TopicDataPtr = std::shared_ptr; @@ -94,12 +93,10 @@ class GraphCache final rmw_names_and_types_t * topic_names_and_types) const; rmw_ret_t count_publishers( - const rmw_node_t * node, const char * topic_name, size_t * count) const; rmw_ret_t count_subscriptions( - const rmw_node_t * node, const char * topic_name, size_t * count) const; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index ed957fdd..4fa0ebbb 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1270,7 +1270,7 @@ rmw_create_subscription( // Publish to the graph that a new subscription is in town const auto liveliness_entity = liveliness::Entity::make( - liveliness::EntityType::Publisher, + liveliness::EntityType::Subscription, liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, liveliness::TopicInfo{rmw_subscription->topic_name, sub_data->type_support->get_name(), "reliable"} @@ -2173,8 +2173,7 @@ rmw_count_publishers( } RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache.count_publishers( - node, topic_name, count); + return node->context->impl->graph_cache.count_publishers(topic_name, count); } //============================================================================== @@ -2204,8 +2203,7 @@ rmw_count_subscribers( } RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache.count_subscriptions( - node, topic_name, count); + return node->context->impl->graph_cache.count_subscriptions(topic_name, count); } //==============================================================================