Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle duplicate node names by including zenoh session id in liveliness tokens #87

Merged
merged 6 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 113 additions & 52 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <algorithm>
#include <functional>
#include <memory>
#include <mutex>
#include <optional>
Expand Down Expand Up @@ -156,6 +157,7 @@ void GraphCache::parse_put(const std::string & keyexpr)
[&](const Entity & entity) -> std::shared_ptr<GraphNode>
{
auto graph_node = std::make_shared<GraphNode>();
graph_node->id_ = entity.id();
graph_node->ns_ = entity.node_namespace();
graph_node->name_ = entity.node_name();
graph_node->enclave_ = entity.node_enclave();
Expand Down Expand Up @@ -187,29 +189,55 @@ void GraphCache::parse_put(const std::string & keyexpr)
}

// Add the node to the namespace if it did not exist and return.
NodeMap::iterator node_it = ns_it->second.find(entity.node_name());
if (node_it == ns_it->second.end()) {
ns_it->second.insert(std::make_pair(entity.node_name(), make_graph_node(entity)));
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp", "Added node /%s to an existing namespace %s in the graph.",
entity.node_name().c_str(),
entity.node_namespace().c_str());
return;
// Case 1: First time a node with this name is added to the namespace.
// Case 2: There are one or more nodes with the same name but the entity could
// represent a node with the same name but a unique id which would make it a
// new addition to the graph.
std::pair<NodeMap::iterator, NodeMap::iterator> range = ns_it->second.equal_range(
entity.node_name());
NodeMap::iterator node_it = std::find_if(
range.first, range.second,
[&entity](const std::pair<std::string, GraphNodePtr> & node_it)
{
return entity.id() == node_it.second->id_;
});
if (node_it == range.second) {
// Either the first time a node with this name is added or with an existing
// name but unique id.
NodeMap::iterator insertion_it =
ns_it->second.insert(std::make_pair(entity.node_name(), make_graph_node(entity)));
if (insertion_it != ns_it->second.end()) {
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"Added a new node /%s with id %s to an existing namespace %s in the graph.",
entity.node_name().c_str(),
entity.id().c_str(),
entity.node_namespace().c_str());
return;
} else {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to add a new node /%s with id %s an existing namespace %s in the graph. Report this bug.",
entity.node_name().c_str(),
entity.id().c_str(),
entity.node_namespace().c_str());
return;
}
} else {
// The entity represents a node that already exists in the graph.
// Update topic info if required below.
}
Yadunund marked this conversation as resolved.
Show resolved Hide resolved

// Handles additions to an existing node in the graph.
if (entity.type() == EntityType::Node) {
// The NN entity is implicitly handled above where we add the node.
// If control reaches here, then we received a duplicate entry for the same node.
// This could happen when we get() all the liveliness tokens when the node spins up and
// receive a MP token before an NN one.
// Creating a new node above would have also updated the graph with any topic info.
return;
}

if (!entity.topic_info().has_value()) {
// Likely an error with parsing the token.
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp", "Put token %s parsed without extracting topic_info.",
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp", "Put token %s parsed without extracting topic_info. Report this bug.",
keyexpr.c_str());
return;
}
Expand All @@ -228,11 +256,42 @@ void GraphCache::parse_del(const std::string & keyexpr)
}
const liveliness::Entity & entity = *valid_entity;

// Helper lambda to update graph_topics_.
auto update_graph_topics =
[&](const liveliness::TopicInfo topic_info, std::size_t pub_count,
std::size_t sub_count) -> void
{
GraphNode::TopicMap::iterator cache_topic_it =
graph_topics_.find(topic_info.name_);
if (cache_topic_it == graph_topics_.end()) {
// This should not happen.
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp", "topic_key %s not found in graph_topics_. Report this.",
topic_info.name_.c_str());
} else {
GraphNode::TopicDataMap::iterator cache_topic_data_it =
cache_topic_it->second.find(topic_info.type_);
if (cache_topic_data_it != cache_topic_it->second.end()) {
// Decrement the relevant counters. If both counters are 0 remove from cache.
cache_topic_data_it->second->stats_.pub_count_ -= pub_count;
cache_topic_data_it->second->stats_.sub_count_ -= sub_count;
if (cache_topic_data_it->second->stats_.pub_count_ == 0 &&
cache_topic_data_it->second->stats_.sub_count_ == 0)
{
cache_topic_it->second.erase(cache_topic_data_it);
}
// If the topic does not have any TopicData entries, erase the topic from the map.
if (cache_topic_it->second.empty()) {
graph_topics_.erase(cache_topic_it);
}
}
}
};

// Helper lambda to append pub/subs to the GraphNode.
// We capture by reference to update caches like graph_topics_ if update_cache is true.
auto remove_topic_data =
[&](const Entity & entity, GraphNode & graph_node,
bool update_cache = false) -> void
[&](const Entity & entity, GraphNode & graph_node) -> void
{
if (entity.type() != EntityType::Publisher && entity.type() != EntityType::Subscription) {
return;
Expand Down Expand Up @@ -288,33 +347,7 @@ void GraphCache::parse_del(const std::string & keyexpr)
}

// Bookkeeping: Update graph_topic_ which keeps track of topics across all nodes in the graph.
if (update_cache) {
GraphNode::TopicMap::iterator cache_topic_it =
graph_topics_.find(topic_info.name_);
if (cache_topic_it == graph_topics_.end()) {
// This should not happen.
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp", "topic_key %s not found in graph_topics_. Report this.",
topic_info.name_.c_str());
} else {
GraphNode::TopicDataMap::iterator cache_topic_data_it =
cache_topic_it->second.find(topic_info.type_);
if (cache_topic_data_it != cache_topic_it->second.end()) {
// Decrement the relevant counters. If both counters are 0 remove from cache.
cache_topic_data_it->second->stats_.pub_count_ -= pub_count;
cache_topic_data_it->second->stats_.sub_count_ -= sub_count;
if (cache_topic_data_it->second->stats_.pub_count_ == 0 &&
cache_topic_data_it->second->stats_.sub_count_ == 0)
{
cache_topic_it->second.erase(cache_topic_data_it);
}
// If the topic does not have any TopicData entries, erase the topic from the map.
if (cache_topic_it->second.empty()) {
graph_topics_.erase(cache_topic_it);
}
}
}
}
update_graph_topics(topic_info, pub_count, sub_count);

RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
Expand All @@ -336,8 +369,22 @@ void GraphCache::parse_del(const std::string & keyexpr)
}

// If the node does not exist, ignore the request.
NodeMap::iterator node_it = ns_it->second.find(entity.node_name());
if (node_it == ns_it->second.end()) {
std::pair<NodeMap::iterator, NodeMap::iterator> range = ns_it->second.equal_range(
entity.node_name());
NodeMap::iterator node_it = std::find_if(
range.first, range.second,
[&entity](const std::pair<std::string, GraphNodePtr> & node_it)
{
// An operator== overload is defined above.
return entity.id() == node_it.second->id_;
});
if (node_it == range.second) {
// Node does not exist.
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"Received liveliness token to remove unknown node /%s from the graph. Ignoring...",
entity.node_name().c_str()
);
return;
}

Expand All @@ -351,13 +398,25 @@ void GraphCache::parse_del(const std::string & keyexpr)
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"Received liveliness token to remove node /%s from the graph before all pub/subs for this "
"node have been removed. Report this issue.",
"node have been removed. Removing all pub/subs first...",
entity.node_name().c_str()
);
// TODO(Yadunund): Iterate through the nodes pubs_ and subs_ and decrement topic count in
// graph_topics_.
auto remove_topics =
[&](const GraphNode::TopicMap & topic_map, const EntityType & entity_type) -> void {
std::size_t pub_count = entity_type == EntityType::Publisher ? 1 : 0;
std::size_t sub_count = !pub_count;
for (auto topic_it = topic_map.begin(); topic_it != topic_map.end(); ++topic_it) {
for (auto type_it = topic_it->second.begin(); type_it != topic_it->second.end();
++type_it)
{
update_graph_topics(type_it->second->info_, pub_count, sub_count);
}
}
};
remove_topics(graph_node->pubs_, EntityType::Publisher);
remove_topics(graph_node->subs_, EntityType::Subscription);
}
ns_it->second.erase(entity.node_name());
ns_it->second.erase(node_it);
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"Removed node /%s from the graph.",
Expand All @@ -368,14 +427,14 @@ void GraphCache::parse_del(const std::string & keyexpr)

if (!entity.topic_info().has_value()) {
// Likely an error with parsing the token.
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp", "Del token %s parsed without extracting TopicData.",
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp", "Del token %s parsed without extracting TopicData. Report this bug.",
keyexpr.c_str());
return;
}

// Update the graph based on the entity.
remove_topic_data(entity, *(node_it->second), true);
remove_topic_data(entity, *(node_it->second));
}

///=============================================================================
Expand Down Expand Up @@ -667,6 +726,8 @@ rmw_ret_t GraphCache::get_entity_names_and_types_by_node(
}

// Check if node exists.
// Since NodeMap is a multimap, this will return the first node with the same
// name that is found.
NodeMap::const_iterator node_it = ns_it->second.find(node_name);
if (node_it == ns_it->second.end()) {
return RMW_RET_OK;
Expand Down
4 changes: 3 additions & 1 deletion rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ using TopicDataPtr = std::shared_ptr<TopicData>;
// TODO(Yadunund): Expand to services and clients.
struct GraphNode
{
std::string id_;
std::string ns_;
std::string name_;
// TODO(Yadunund): Should enclave be the parent to the namespace key and not within a Node?
Expand Down Expand Up @@ -131,7 +132,8 @@ class GraphCache final
node_n:
*/

using NodeMap = std::unordered_map<std::string, GraphNodePtr>;
// We rely on a multimap to store nodes with duplicate names.
using NodeMap = std::multimap<std::string, GraphNodePtr>;
using NamespaceMap = std::unordered_map<std::string, NodeMap>;
// Map namespace to a map of <node_name, GraphNodePtr>.
NamespaceMap graph_ = {};
Expand Down
Loading
Loading