From d6ef8ac126dc925237ac37413e36c279637b3cd8 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 21 Dec 2023 17:17:21 +0800 Subject: [PATCH] Switch to multimap for NodeMap Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 175 ++++++++++++------ rmw_zenoh_cpp/src/detail/graph_cache.hpp | 13 +- rmw_zenoh_cpp/src/detail/liveliness_utils.cpp | 19 +- rmw_zenoh_cpp/src/detail/liveliness_utils.hpp | 1 + 4 files changed, 150 insertions(+), 58 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index bcd48a76..68c963ec 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include #include @@ -54,6 +55,16 @@ TopicData::TopicData( stats_(std::move(stats)) {} +///============================================================================= +// bool operator==(const GraphNodePtr & lhs, const GraphNodePtr & rhs) { +// return lhs->id_ == rhs->id_ && lhs->ns_ == rhs->ns_ && lhs->name_ == rhs->name_; +// } + +///============================================================================= +// bool GraphCache::NodeComparator::operator()(const GraphNodePtr & lhs, const GraphNodePtr & rhs) const { +// return std::hash{}(lhs->id_) < std::hash{}(rhs->id_); +// } + ///============================================================================= void GraphCache::parse_put(const std::string & keyexpr) { @@ -156,6 +167,7 @@ void GraphCache::parse_put(const std::string & keyexpr) [&](const Entity & entity) -> std::shared_ptr { auto graph_node = std::make_shared(); + graph_node->id_ = entity.id(); graph_node->ns_ = entity.node_namespace(); graph_node->name_ = entity.node_name(); graph_node->enclave_ = entity.node_enclave(); @@ -187,29 +199,55 @@ void GraphCache::parse_put(const std::string & keyexpr) } // Add the node to the namespace if it did not exist and return. - NodeMap::iterator node_it = ns_it->second.find(entity.node_name()); - if (node_it == ns_it->second.end()) { - ns_it->second.insert(std::make_pair(entity.node_name(), make_graph_node(entity))); - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Added node /%s to an existing namespace %s in the graph.", - entity.node_name().c_str(), - entity.node_namespace().c_str()); - return; + // Case 1: First time a node with this name is added to the namespace. + // Case 2: There are one or more nodes with the same name but the entity could + // represent a node with the same name but a unique id which would make it a + // new addition to the graph. + std::pair range = ns_it->second.equal_range( + entity.node_name()); + NodeMap::iterator node_it = std::find_if( + range.first, range.second, + [&entity](const std::pair & node_it) + { + return entity.id() == node_it.second->id_; + }); + if (node_it == range.second) { + // Either the first time a node with this name is added or with an existing + // name but unique id. + NodeMap::iterator insertion_it = + ns_it->second.insert(std::make_pair(entity.node_name(), make_graph_node(entity))); + if (insertion_it != ns_it->second.end()) { + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "Added a new node /%s with id %s to an existing namespace %s in the graph.", + entity.node_name().c_str(), + entity.id().c_str(), + entity.node_namespace().c_str()); + return; + } else { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to add a new node /%s with id %s an existing namespace %s in the graph. Report this bug.", + entity.node_name().c_str(), + entity.id().c_str(), + entity.node_namespace().c_str()); + return; + } + } else { + // The entity represents a node that already exists in the graph. + // Update topic info if required below. } // Handles additions to an existing node in the graph. if (entity.type() == EntityType::Node) { - // The NN entity is implicitly handled above where we add the node. - // If control reaches here, then we received a duplicate entry for the same node. - // This could happen when we get() all the liveliness tokens when the node spins up and - // receive a MP token before an NN one. + // Creating a new node above would have also updated the graph with any topic info. return; } if (!entity.topic_info().has_value()) { // Likely an error with parsing the token. - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Put token %s parsed without extracting topic_info.", + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", "Put token %s parsed without extracting topic_info. Report this bug.", keyexpr.c_str()); return; } @@ -228,11 +266,42 @@ void GraphCache::parse_del(const std::string & keyexpr) } const liveliness::Entity & entity = *valid_entity; + // Helper lambda to update graph_topics_. + auto update_graph_topics = + [&](const liveliness::TopicInfo topic_info, std::size_t pub_count, + std::size_t sub_count) -> void + { + GraphNode::TopicMap::iterator cache_topic_it = + graph_topics_.find(topic_info.name_); + if (cache_topic_it == graph_topics_.end()) { + // This should not happen. + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", "topic_key %s not found in graph_topics_. Report this.", + topic_info.name_.c_str()); + } else { + GraphNode::TopicDataMap::iterator cache_topic_data_it = + cache_topic_it->second.find(topic_info.type_); + if (cache_topic_data_it != cache_topic_it->second.end()) { + // Decrement the relevant counters. If both counters are 0 remove from cache. + cache_topic_data_it->second->stats_.pub_count_ -= pub_count; + cache_topic_data_it->second->stats_.sub_count_ -= sub_count; + if (cache_topic_data_it->second->stats_.pub_count_ == 0 && + cache_topic_data_it->second->stats_.sub_count_ == 0) + { + cache_topic_it->second.erase(cache_topic_data_it); + } + // If the topic does not have any TopicData entries, erase the topic from the map. + if (cache_topic_it->second.empty()) { + graph_topics_.erase(cache_topic_it); + } + } + } + }; + // Helper lambda to append pub/subs to the GraphNode. // We capture by reference to update caches like graph_topics_ if update_cache is true. auto remove_topic_data = - [&](const Entity & entity, GraphNode & graph_node, - bool update_cache = false) -> void + [&](const Entity & entity, GraphNode & graph_node) -> void { if (entity.type() != EntityType::Publisher && entity.type() != EntityType::Subscription) { return; @@ -288,33 +357,7 @@ void GraphCache::parse_del(const std::string & keyexpr) } // Bookkeeping: Update graph_topic_ which keeps track of topics across all nodes in the graph. - if (update_cache) { - GraphNode::TopicMap::iterator cache_topic_it = - graph_topics_.find(topic_info.name_); - if (cache_topic_it == graph_topics_.end()) { - // This should not happen. - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", "topic_key %s not found in graph_topics_. Report this.", - topic_info.name_.c_str()); - } else { - GraphNode::TopicDataMap::iterator cache_topic_data_it = - cache_topic_it->second.find(topic_info.type_); - if (cache_topic_data_it != cache_topic_it->second.end()) { - // Decrement the relevant counters. If both counters are 0 remove from cache. - cache_topic_data_it->second->stats_.pub_count_ -= pub_count; - cache_topic_data_it->second->stats_.sub_count_ -= sub_count; - if (cache_topic_data_it->second->stats_.pub_count_ == 0 && - cache_topic_data_it->second->stats_.sub_count_ == 0) - { - cache_topic_it->second.erase(cache_topic_data_it); - } - // If the topic does not have any TopicData entries, erase the topic from the map. - if (cache_topic_it->second.empty()) { - graph_topics_.erase(cache_topic_it); - } - } - } - } + update_graph_topics(topic_info, pub_count, sub_count); RCUTILS_LOG_INFO_NAMED( "rmw_zenoh_cpp", @@ -336,8 +379,22 @@ void GraphCache::parse_del(const std::string & keyexpr) } // If the node does not exist, ignore the request. - NodeMap::iterator node_it = ns_it->second.find(entity.node_name()); - if (node_it == ns_it->second.end()) { + std::pair range = ns_it->second.equal_range( + entity.node_name()); + NodeMap::iterator node_it = std::find_if( + range.first, range.second, + [&entity](const std::pair & node_it) + { + // An operator== overload is defined above. + return entity.id() == node_it.second->id_; + }); + if (node_it == range.second) { + // Node does not exist. + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "Received liveliness token to remove unknown node /%s from the graph. Ignoring...", + entity.node_name().c_str() + ); return; } @@ -351,13 +408,25 @@ void GraphCache::parse_del(const std::string & keyexpr) RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", "Received liveliness token to remove node /%s from the graph before all pub/subs for this " - "node have been removed. Report this issue.", + "node have been removed. Removing all pub/subs first...", entity.node_name().c_str() ); - // TODO(Yadunund): Iterate through the nodes pubs_ and subs_ and decrement topic count in - // graph_topics_. + auto remove_topics = + [&](const GraphNode::TopicMap & topic_map, const EntityType & entity_type) -> void { + std::size_t pub_count = entity_type == EntityType::Publisher ? 1 : 0; + std::size_t sub_count = !pub_count; + for (auto topic_it = topic_map.begin(); topic_it != topic_map.end(); ++topic_it) { + for (auto type_it = topic_it->second.begin(); type_it != topic_it->second.end(); + ++type_it) + { + update_graph_topics(type_it->second->info_, pub_count, sub_count); + } + } + }; + remove_topics(graph_node->pubs_, EntityType::Publisher); + remove_topics(graph_node->subs_, EntityType::Subscription); } - ns_it->second.erase(entity.node_name()); + ns_it->second.erase(node_it); RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", "Removed node /%s from the graph.", @@ -368,14 +437,14 @@ void GraphCache::parse_del(const std::string & keyexpr) if (!entity.topic_info().has_value()) { // Likely an error with parsing the token. - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Del token %s parsed without extracting TopicData.", + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", "Del token %s parsed without extracting TopicData. Report this bug.", keyexpr.c_str()); return; } // Update the graph based on the entity. - remove_topic_data(entity, *(node_it->second), true); + remove_topic_data(entity, *(node_it->second)); } ///============================================================================= @@ -667,6 +736,8 @@ rmw_ret_t GraphCache::get_entity_names_and_types_by_node( } // Check if node exists. + // Since NodeMap is a multimap, this will return the first node with the same + // name that is found. NodeMap::const_iterator node_it = ns_it->second.find(node_name); if (node_it == ns_it->second.end()) { return RMW_RET_OK; diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index 3d0005f2..f0fffca8 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -58,6 +58,7 @@ using TopicDataPtr = std::shared_ptr; // TODO(Yadunund): Expand to services and clients. struct GraphNode { + std::string id_; std::string ns_; std::string name_; // TODO(Yadunund): Should enclave be the parent to the namespace key and not within a Node? @@ -71,6 +72,9 @@ struct GraphNode TopicMap subs_ = {}; }; using GraphNodePtr = std::shared_ptr; +// Two nodes are the same if their id, name and namespaces are identical. +// TODO(Yadunund): Once zenoh API gives us globally unique IDs for entities, rely on only the id. +// bool operator==(const GraphNodePtr & lhs, const GraphNodePtr & rhs); ///============================================================================= class GraphCache final @@ -131,7 +135,14 @@ class GraphCache final node_n: */ - using NodeMap = std::unordered_map; + // // A comparator for sorting nodes that have the same name based on their + // // zenoh session ids. + // struct NodeComparator { + // bool operator()(const GraphNodePtr & lhs, const GraphNodePtr & rhs) const; + // }; + + // We rely on a multimap to store nodes with duplicate names. + using NodeMap = std::multimap; using NamespaceMap = std::unordered_map; // Map namespace to a map of . NamespaceMap graph_ = {}; diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index cb92699d..7fae66f8 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -83,14 +83,15 @@ static const std::unordered_map str_to_entity = { {CLI_STR, EntityType::Client} }; -std::string convert(z_id_t id) { +std::string convert(z_id_t id) +{ std::stringstream ss; ss << std::hex; size_t i = 0; for (; i < (sizeof(id.id) - 1); i++) { ss << static_cast(id.id[i]) << "."; } - ss << static_cast(id.id[i]) << std::dec; + ss << static_cast(id.id[i]); return ss.str(); } @@ -109,7 +110,8 @@ Entity::Entity( EntityType type, NodeInfo node_info, std::optional topic_info) -: type_(std::move(type)), +: id_(std::move(id)), + type_(std::move(type)), node_info_(std::move(node_info)), topic_info_(std::move(topic_info)) { @@ -132,7 +134,8 @@ Entity::Entity( */ std::stringstream token_ss; const std::string & ns = node_info_.ns_; - token_ss << ADMIN_SPACE << "/" << node_info_.domain_id_ << "/" << id << "/" << entity_to_str.at(type_) << ns; + token_ss << ADMIN_SPACE << "/" << node_info_.domain_id_ << "/" << id_ << "/" << entity_to_str.at( + type_) << ns; // An empty namespace from rcl will contain "/" but zenoh does not allow keys with "//". // Hence we add an "_" to denote an empty namespace such that splitting the key // will always result in 5 parts. @@ -254,7 +257,7 @@ std::optional Entity::make(const std::string & keyexpr) EntityType entity_type = entity_it->second; std::size_t domain_id = std::stoul(parts[1]); - std::string& id = parts[2]; + std::string & id = parts[2]; std::string ns = parts[4] == "_" ? "/" : "/" + std::move(parts[4]); std::string node_name = std::move(parts[5]); std::optional topic_info = std::nullopt; @@ -280,6 +283,12 @@ std::optional Entity::make(const std::string & keyexpr) std::move(topic_info)}; } +///============================================================================= +std::string Entity::id() const +{ + return this->id_; +} + ///============================================================================= EntityType Entity::type() const { diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp index 54e037ea..13e09cb4 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp @@ -111,6 +111,7 @@ class Entity NodeInfo node_info, std::optional topic_info); + std::string id_; EntityType type_; NodeInfo node_info_; std::optional topic_info_;