diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 8107a373..739d478e 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -204,9 +204,52 @@ std::vector split_keyexpr( result.push_back(keyexpr.substr(delim_idx.back() + 1)); return result; } + +///============================================================================= +// An internal struct to bundle results of parsing a token. +struct TokenNode +{ + + struct TokenTopicData + { + std::string name_; + std::string type_; + std::string qos_; + + TokenTopicData( + std::string name, + std::string type, + std::string qos) + : name_(std::move(name)), + type_(std::move(type)), + qos_(std::move(qos)) + { + // Do nothing. + } + }; + + std::string ns_; + std::string name_; + std::string enclave_; + std::optional topic_data_; + + TokenNode( + std::string ns, + std::string name, + std::string enclave, + std::optional topic_data = std::nullopt) + : ns_(std::move(ns)), + name_(std::move(name)), + enclave_(std::move(enclave)), + topic_data_(std::move(topic_data)) + { + // Do nothing. + } +}; + ///============================================================================= // Convert a liveliness token into a -std::optional> _parse_token(const std::string & keyexpr) +std::optional> _parse_token(const std::string & keyexpr) { std::vector parts = split_keyexpr(keyexpr); // At minimum, a token will contain 5 parts (@ros2_lv, domain_id, entity, namespace, node_name). @@ -228,11 +271,20 @@ std::optional> _parse_token(const std::string // Get the entity, ie NN, MP, MS, SS, SC. std::string & entity = parts[2]; + if (entity != "NN" && entity != "MP" && entity != "MS" && entity != "SS" && entity != "SC") { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Received liveliness token with invalid entity [%s].", entity.c_str()); + return std::nullopt; + } - GraphNode node; + // TODO(Yadunund): Support enclaves. // Nodes with empty namespaces will contain a "_". - node.ns = parts[3] == "_" ? "/" : "/" + parts[3]; - node.name = std::move(parts[4]); + TokenNode node{ + parts[3] == "_" ? "/" : "/" + parts[3], + std::move(parts[4]), + "" + }; if (entity != "NN") { if (parts.size() < 8) { @@ -241,19 +293,13 @@ std::optional> _parse_token(const std::string "Received invalid liveliness token"); return std::nullopt; } - GraphNode::TopicData data; - data.topic = "/" + std::move(parts[5]); - data.type = std::move(parts[6]); - data.qos = std::move(parts[7]); - - if (entity == "MP") { - node.pubs.push_back(std::move(data)); - } else if (entity == "MS") { - node.subs.push_back(std::move(data)); - } else if (entity == "SS") { - // TODO(yadunund): Service server - } else if (entity == "SC") { - // TODO(yadunund): Service client + if (entity == "MP" || entity == "MS" || entity == "SS" || entity == "SC") { + TokenNode::TokenTopicData topic_data{ + "/" + std::move(parts[5]), + std::move(parts[6]), + std::move(parts[7]) + }; + node.topic_data_ = std::move(topic_data); } else { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -266,14 +312,26 @@ std::optional> _parse_token(const std::string } } // namespace + ///============================================================================= -GraphCache::TopicStats::TopicStats(std::size_t pub_count, std::size_t sub_count) +TopicStats::TopicStats(std::size_t pub_count, std::size_t sub_count) : pub_count_(pub_count), sub_count_(sub_count) { // Do nothing. } + +///============================================================================= +TopicData::TopicData( + std::string type, + std::string qos, + TopicStats stats) +: type_(std::move(type)), + qos_(std::move(qos)), + stats_(std::move(stats)) +{} + ///============================================================================= void GraphCache::parse_put(const std::string & keyexpr) { @@ -283,111 +341,129 @@ void GraphCache::parse_put(const std::string & keyexpr) return; } const std::string & entity = valid_token->first; - auto node = std::make_shared(std::move(valid_token->second)); - std::lock_guard lock(graph_mutex_); + const auto & token_node = valid_token->second; + + // Helper lambda to append pub/subs to the GraphNode. + // We capture by reference to update caches like graph_topics_ if update_cache is true. + auto add_topic_data = + [&](const TokenNode::TokenTopicData & topic_data, GraphNode & graph_node, + const std::string & entity, bool update_cache = false) -> void + { + if (entity != "MP" && entity != "MS") { + return; + } - if (entity == "NN") { - // Node - auto ns_it = graph_.find(node->ns); - if (ns_it == graph_.end()) { - // New namespace. - std::unordered_map map = { - {node->name, node} - }; - graph_.insert(std::make_pair(std::move(node->ns), std::move(map))); - } else { - auto insertion = ns_it->second.insert(std::make_pair(node->name, node)); + auto & topic_map = entity == "MP" ? graph_node.pubs_ : graph_node.subs_; + const std::string entity_desc = entity == "MP" ? "publisher" : "subscription"; + const std::size_t pub_count = entity == "MP" ? 1 : 0; + const std::size_t sub_count = !pub_count; + TopicDataPtr graph_topic_data = std::make_shared( + topic_data.type_, + topic_data.qos_, + TopicStats{pub_count, sub_count}); + + GraphNode::TopicDataSet topic_data_set = {graph_topic_data}; + auto insertion = topic_map.insert(std::make_pair(topic_data.name_, topic_data_set)); if (!insertion.second) { - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Unable to add duplicate node /%s to the graph.", - node->name.c_str()); + // A topic with the same name already exists in the node so we append the type. + auto type_insertion = insertion.first->second.insert(graph_topic_data); + if (!type_insertion.second) { + // We have another instance of a pub/sub over the same topic and type so we increment the counters. + auto & existing_graph_topic = *(type_insertion.first); + existing_graph_topic->stats_.pub_count_ += pub_count; + existing_graph_topic->stats_.pub_count_ += sub_count; + } } - } - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Added node /%s to the graph.", - node->name.c_str()); - return; - } else if (entity == "MP") { - // Publisher - auto ns_it = graph_.find(node->ns); - if (ns_it == graph_.end()) { - // Potential edge case where a liveliness update for a node creation was missed. - // So we add the node here. - std::string ns = node->ns; - std::unordered_map map = { - {node->name, node} - }; - graph_.insert(std::make_pair(std::move(ns), std::move(map))); - } else { - auto insertion = ns_it->second.insert(std::make_pair(node->name, node)); - if (!insertion.second && !node->pubs.empty()) { - // Node already exists so just append the publisher. - insertion.first->second->pubs.push_back(node->pubs[0]); - } else { - return; + // Bookkeeping: We update graph_topic_ which keeps track of topics across all nodes in the graph. + if (update_cache) { + std::string topic_key = topic_data.name_ + "?" + topic_data.type_; + auto cache_insertion = graph_topics_.insert(std::make_pair(std::move(topic_key), nullptr)); + if (cache_insertion.second) { + // First entry for this topic name + type combo so we create a new TopicStats instance. + cache_insertion.first->second = std::make_unique(pub_count, sub_count); + } else { + // Else we update the existing counters. + cache_insertion.first->second->pub_count_ += pub_count; + cache_insertion.first->second->sub_count_ += sub_count; + } } - } - // Bookkeeping - // TODO(Yadunund): Be more systematic about generating the key. - std::string topic_key = node->pubs.at(0).topic + "?" + node->pubs.at(0).type; - auto insertion = graph_topics_.insert(std::make_pair(std::move(topic_key), nullptr)); - if (!insertion.second) { - // Such a topic already exists so we just increment its count. - ++insertion.first->second->pub_count_; - } else { - insertion.first->second = std::make_unique(1, 0); - } + + RCUTILS_LOG_INFO_NAMED( + "rmw_zenoh_cpp", + "Added %s on topic %s with type %s and qos %s to node /%s.", + entity_desc.c_str(), + topic_data.name_.c_str(), + topic_data.type_.c_str(), + topic_data.qos_.c_str(), + graph_node.name_.c_str()); + }; + + // Helper lambdas to convert a TokenNode into a basic GraphNode. + auto make_graph_node = + [&](const TokenNode & token_node, const std::string & entity) -> std::shared_ptr + { + auto graph_node = std::make_shared(); + graph_node->ns_ = token_node.ns_; + graph_node->name_ = token_node.name_; + graph_node->enclave_ = token_node.enclave_; + + if (!token_node.topic_data_.has_value()) { + // Token was for a node. + return graph_node; + } + // Add pub/sub entries. + add_topic_data(token_node.topic_data_.value(), *graph_node, entity, true); + + return graph_node; + }; + + // Lock the graph mutex before accessing the graph. + std::lock_guard lock(graph_mutex_); + + // If the namespace did not exist, create it and add the node to the graph and return. + auto ns_it = graph_.find(token_node.ns_); + if (ns_it == graph_.end()) { + std::unordered_map node_map = { + {token_node.name_, make_graph_node(token_node, entity)}}; + graph_.insert(std::make_pair(token_node.ns_, std::move(node_map))); RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Added publisher %s to node /%s in graph.", - node->pubs.at(0).topic.c_str(), - node->name.c_str()); + "rmw_zenoh_cpp", "Added node /%s to a new namespace %s in the graph.", + token_node.name_.c_str(), + token_node.ns_.c_str()); return; - } else if (entity == "MS") { - // Subscription - auto ns_it = graph_.find(node->ns); - if (ns_it == graph_.end()) { - // Potential edge case where a liveliness update for a node creation was missed. - // So we add the node here. - std::string ns = node->ns; - std::unordered_map map = { - {node->name, node} - }; - graph_.insert(std::make_pair(std::move(ns), std::move(map))); - } else { - auto insertion = ns_it->second.insert(std::make_pair(node->name, node)); - if (!insertion.second && !node->subs.empty()) { - // Node already exists so just append the publisher. - insertion.first->second->subs.push_back(node->subs[0]); - } else { - return; - } - } - // Bookkeeping - // TODO(Yadunund): Be more systematic about generating the key. - std::string topic_key = node->subs.at(0).topic + "?" + node->subs.at(0).type; - auto insertion = graph_topics_.insert(std::make_pair(std::move(topic_key), nullptr)); - if (!insertion.second) { - // Such a topic already exists so we just increment its count. - ++insertion.first->second->sub_count_; - } else { - insertion.first->second = std::make_unique(0, 1); - } + } + + // Add the node to the namespace if it did not exist and return. + auto node_it = ns_it->second.find(token_node.name_); + if (node_it == ns_it->second.end()) { + ns_it->second.insert(std::make_pair(token_node.name_, make_graph_node(token_node, entity))); RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Added subscription %s to node /%s in graph.", - node->subs.at(0).topic.c_str(), - node->name.c_str()); + "rmw_zenoh_cpp", "Added node /%s to an existing namespace %s in the graph.", + token_node.name_.c_str(), + token_node.ns_.c_str()); return; - } else if (entity == "SS") { - // TODO(yadunund): Service server - } else if (entity == "SC") { - // TODO(yadunud): Service Client - } else { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Received liveliness token with invalid entity type."); + } + + // Handles additions to an existing node in the graph. + if (entity == "NN") { + // The NN entity is implicitly handled above where we add the node. + // If control reaches here, then we received a duplicate entry for the same node. + // This could happen when we get() all the liveliness tokens when the node spins up and + // receive a MP token before an NN one. + return; + } + + if (!token_node.topic_data_.has_value()) { + // Likely an error with parsing the token. + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", "Put token %s parsed without extracting TopicData.", + keyexpr.c_str()); return; } + + // Update the graph based on the entity. + add_topic_data(token_node.topic_data_.value(), *(node_it->second), entity, true); } ///============================================================================= @@ -399,122 +475,136 @@ void GraphCache::parse_del(const std::string & keyexpr) return; } const std::string & entity = valid_token->first; - auto node = std::make_shared(std::move(valid_token->second)); + const auto & token_node = valid_token->second; + + // Helper lambda to append pub/subs to the GraphNode. + // We capture by reference to update caches like graph_topics_ if update_cache is true. + auto remove_topic_data = + [&](const TokenNode::TokenTopicData & topic_data, GraphNode & graph_node, + const std::string & entity, bool update_cache = false) -> void + { + if (entity != "MP" && entity != "MS") { + return; + } + + auto & topic_map = entity == "MP" ? graph_node.pubs_ : graph_node.subs_; + const std::string entity_desc = entity == "MP" ? "publisher" : "subscription"; + const std::size_t pub_count = entity == "MP" ? 1 : 0; + const std::size_t sub_count = !pub_count; + + auto topic_it = topic_map.find(topic_data.name_); + if (topic_it == topic_map.end()) { + // Pub/sub not found. + return; + } + + auto & topic_data_set = topic_it->second; + // Search the unordered_set for the TopicData for this topic. + auto topic_data_it = std::find_if( + topic_data_set.begin(), topic_data_set.end(), + [&topic_data](const auto topic_data_ptr) { + return topic_data.type_ == topic_data_ptr->type_; + }); + if (topic_data_it == topic_data_set.end()) { + // Something is wrong. + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", "TopicData not found for topic %s. Report this.", + topic_data.name_.c_str()); + return; + } + + // Decrement the relevant counters. Check if both counters are 0 and if so remove from graph_node. + auto & existing_topic_data = *topic_data_it; + existing_topic_data->stats_.pub_count_ -= pub_count; + existing_topic_data->stats_.sub_count_ -= sub_count; + if (existing_topic_data->stats_.pub_count_ == 0 && + existing_topic_data->stats_.sub_count_ == 0) + { + topic_data_set.erase(topic_data_it); + } + // If the topic does not have any TopicData entries, erase the topic from the map. + if (topic_data_set.empty()) { + topic_map.erase(topic_data.name_); + } + + // Bookkeeping: We update graph_topic_ which keeps track of topics across all nodes in the graph. + if (update_cache) { + std::string topic_key = topic_data.name_ + "?" + topic_data.type_; + auto cache_topic_it = graph_topics_.find(topic_key); + if (cache_topic_it == graph_topics_.end()) { + // This should not happen. + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", "topic_key %s not found in graph_topics_. Report this.", + topic_key.c_str()); + } else { + // Decrement the relevant counters. Check if both counters are 0 and if so remove from cache. + cache_topic_it->second->pub_count_ -= pub_count; + cache_topic_it->second->sub_count_ -= sub_count; + if (cache_topic_it->second->pub_count_ == 0 && cache_topic_it->second->sub_count_ == 0) { + graph_topics_.erase(topic_key); + } + } + } + + RCUTILS_LOG_INFO_NAMED( + "rmw_zenoh_cpp", + "Removed %s on topic %s with type %s and qos %s to node /%s.", + entity_desc.c_str(), + topic_data.name_.c_str(), + topic_data.type_.c_str(), + topic_data.qos_.c_str(), + graph_node.name_.c_str()); + }; + + // Lock the graph mutex before accessing the graph. std::lock_guard lock(graph_mutex_); + + // If namespace does not exist, ignore the request. + auto ns_it = graph_.find(token_node.ns_); + if (ns_it == graph_.end()) { + return; + } + + // If the node does not exist, ignore the request. + auto node_it = ns_it->second.find(token_node.name_); + if (node_it == ns_it->second.end()) { + return; + } + if (entity == "NN") { // Node - auto ns_it = graph_.find(node->ns); - if (ns_it != graph_.end()) { - ns_it->second.erase(node->name); + // The liveliness tokens to remove pub/subs should be received before the one to remove a node + // given the reliability QoS for liveliness subs. However, if we find any pubs/subs present in the node below, + // we should update the count in graph_topics_. + const auto graph_node = node_it->second; + if (!graph_node->pubs_.empty() || !graph_node->subs_.empty()) { + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "Received liveliness token to remove node /%s from the graph before all pub/subs for this node have been removed. " + "Report this issue.", + token_node.name_.c_str() + ); + // TODO(Yadunund): Iterate through the nodes pubs_ and subs_ and decrement topic count in graph_topics_. } + ns_it->second.erase(token_node.name_); RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", "Removed node /%s from the graph.", - node->name.c_str() + token_node.name_.c_str() ); - } else if (entity == "MP") { - // Publisher - if (node->pubs.empty()) { - // This should never happen but we make sure _parse_token() has no error. - return; - } - auto ns_it = graph_.find(node->ns); - if (ns_it != graph_.end()) { - auto node_it = ns_it->second.find(node->name); - if (node_it != ns_it->second.end()) { - const auto found_node = node_it->second; - // Here we iterate throught the list of publishers and remove the one - // with matching name, type and qos. - // TODO(Yadunund): This can be more optimal than O(n) with some caching. - auto erase_it = std::find_if( - found_node->pubs.begin(), found_node->pubs.end(), - [&node](const auto & pub) { - return pub.topic == node->pubs.at(0).topic && - pub.type == node->pubs.at(0).type && - pub.qos == node->pubs.at(0).qos; - }); - if (erase_it != found_node->pubs.end()) { - found_node->pubs.erase(erase_it); - // Bookkeeping - // TODO(Yadunund): Be more systematic about generating the key. - std::string topic_key = node->pubs.at(0).topic + "?" + node->pubs.at(0).type; - auto topic_it = graph_topics_.find(topic_key); - if (topic_it != graph_topics_.end()) { - if (topic_it->second->pub_count_ == 1 && topic_it->second->sub_count_ == 0) { - // The last publisher was removed so we can delete this entry. - graph_topics_.erase(topic_key); - } else { - // Else we just decrement the count. - --topic_it->second->pub_count_; - } - } - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "Removed publisher %s from node /%s in the graph.", - node->pubs.at(0).topic.c_str(), - node->name.c_str() - ); - } - } - } - } else if (entity == "MS") { - // Subscription - if (node->subs.empty()) { - // This should never happen but we make sure _parse_token() has no error. - return; - } - auto ns_it = graph_.find(node->ns); - if (ns_it != graph_.end()) { - auto node_it = ns_it->second.find(node->name); - if (node_it != ns_it->second.end()) { - const auto found_node = node_it->second; - // Here we iterate throught the list of subscriptions and remove the one - // with matching name, type and qos. - // TODO(Yadunund): This can be more optimal than O(n) with some caching. - auto erase_it = found_node->subs.begin(); - for (; erase_it != found_node->subs.end(); ++erase_it) { - const auto & sub = *erase_it; - if (sub.topic == node->subs.at(0).topic && - sub.type == node->subs.at(0).type && - sub.qos == node->subs.at(0).qos) - { - break; - } - } - if (erase_it != found_node->subs.end()) { - found_node->subs.erase(erase_it); - // Bookkeeping - // TODO(Yadunund): Be more systematic about generating the key. - std::string topic_key = node->subs.at(0).topic + "?" + node->subs.at(0).type; - auto topic_it = graph_topics_.find(topic_key); - if (topic_it != graph_topics_.end()) { - if (topic_it->second->sub_count_ == 1 && topic_it->second->pub_count_ == 0) { - // The last subscription was removed so we can delete this entry. - graph_topics_.erase(topic_key); - } else { - // Else we just decrement the count. - --topic_it->second->sub_count_; - } - } - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "Removed subscription %s from node /%s in the graph.", - node->subs.at(0).topic.c_str(), - node->name.c_str() - ); - } - } - } - } else if (entity == "SS") { - // TODO(yadunund): Service server - } else if (entity == "SC") { - // TODO(yadunund): Service client - } else { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Received liveliness token with invalid entity type."); return; } + + if (!token_node.topic_data_.has_value()) { + // Likely an error with parsing the token. + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", "Del token %s parsed without extracting TopicData.", + keyexpr.c_str()); + return; + } + + // Update the graph based on the entity. + remove_topic_data(token_node.topic_data_.value(), *(node_it->second), entity, true); } ///============================================================================= @@ -602,7 +692,7 @@ rmw_ret_t GraphCache::get_node_names( const std::string & ns = ns_it->first; for (auto node_it = ns_it->second.begin(); node_it != ns_it->second.end(); ++node_it) { const auto node = node_it->second; - node_names->data[j] = rcutils_strdup(node->name.c_str(), *allocator); + node_names->data[j] = rcutils_strdup(node->name_.c_str(), *allocator); if (!node_names->data[j]) { return RMW_RET_BAD_ALLOC; } @@ -613,7 +703,7 @@ rmw_ret_t GraphCache::get_node_names( } if (enclaves) { enclaves->data[j] = rcutils_strdup( - node->enclave.c_str(), *allocator); + node->enclave_.c_str(), *allocator); if (!enclaves->data[j]) { return RMW_RET_BAD_ALLOC; } diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index f2de0018..b651a014 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include "rcutils/allocator.h" @@ -75,23 +76,54 @@ class PublishToken const std::string & token); }; +///============================================================================= +struct TopicStats +{ + std::size_t pub_count_; + std::size_t sub_count_; + + // Constructor which initializes counters to 0. + TopicStats(std::size_t pub_count, std::size_t sub_count); +}; +using TopicStatsPtr = std::unique_ptr; + +///============================================================================= +struct TopicData +{ + std::string type_; + std::string qos_; + TopicStats stats_; + + TopicData( + std::string type, + std::string qos, + TopicStats stats); +}; +using TopicDataPtr = std::shared_ptr; + ///============================================================================= // TODO(Yadunund): Expand to services and clients. struct GraphNode { - struct TopicData + std::string ns_; + std::string name_; + // TODO(Yadunund): Should enclave be the parent to the namespace key and not within a Node? + std::string enclave_; + + // Hash topic data using "type" string to only support unique topic_types. + // TODO(Yadunund): Should we also factor the "qos" into the cache? + struct TopicDataHash { - std::string topic; - std::string type; - std::string qos; + std::size_t operator()(const TopicDataPtr & data) const + { + return std::hash{}(data->type_); + } }; - - std::string ns; - std::string name; - // TODO(Yadunund): Should enclave be the parent to the namespace key and not within a Node? - std::string enclave; - std::vector pubs; - std::vector subs; + // Map topic name to a set of TopicData to support multiple types per topic. + using TopicDataSet = std::unordered_set; + using TopicMap = std::unordered_map; + TopicMap pubs_ = {}; + TopicMap subs_ = {}; }; using GraphNodePtr = std::shared_ptr; @@ -140,16 +172,8 @@ class GraphCache final // Map namespace to a map of . std::unordered_map> graph_ = {}; - // Optimize topic lookups mapping "topic_name?topic_type" keys to their pub/sub counts. - struct TopicStats - { - std::size_t pub_count_; - std::size_t sub_count_; - - // Constructor which initialized counters to 0. - TopicStats(std::size_t pub_count, std::size_t sub_count); - }; - using TopicStatsPtr = std::unique_ptr; + // Optimize topic lookups across the graph by mapping "topic_name?topic_type" keys to their pub/sub counts. + // TODO(Yadunund): Consider storing a set of NodePtrs for each key. std::unordered_map graph_topics_ = {}; mutable std::mutex graph_mutex_;