Skip to content

Commit

Permalink
Switch to multimap for NodeMap
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Dec 21, 2023
1 parent eb6e066 commit d6ef8ac
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 58 deletions.
175 changes: 123 additions & 52 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <algorithm>
#include <functional>
#include <memory>
#include <mutex>
#include <optional>
Expand Down Expand Up @@ -54,6 +55,16 @@ TopicData::TopicData(
stats_(std::move(stats))
{}

///=============================================================================
// bool operator==(const GraphNodePtr & lhs, const GraphNodePtr & rhs) {
// return lhs->id_ == rhs->id_ && lhs->ns_ == rhs->ns_ && lhs->name_ == rhs->name_;
// }

///=============================================================================
// bool GraphCache::NodeComparator::operator()(const GraphNodePtr & lhs, const GraphNodePtr & rhs) const {
// return std::hash<std::string>{}(lhs->id_) < std::hash<std::string>{}(rhs->id_);
// }

///=============================================================================
void GraphCache::parse_put(const std::string & keyexpr)
{
Expand Down Expand Up @@ -156,6 +167,7 @@ void GraphCache::parse_put(const std::string & keyexpr)
[&](const Entity & entity) -> std::shared_ptr<GraphNode>
{
auto graph_node = std::make_shared<GraphNode>();
graph_node->id_ = entity.id();
graph_node->ns_ = entity.node_namespace();
graph_node->name_ = entity.node_name();
graph_node->enclave_ = entity.node_enclave();
Expand Down Expand Up @@ -187,29 +199,55 @@ void GraphCache::parse_put(const std::string & keyexpr)
}

// Add the node to the namespace if it did not exist and return.
NodeMap::iterator node_it = ns_it->second.find(entity.node_name());
if (node_it == ns_it->second.end()) {
ns_it->second.insert(std::make_pair(entity.node_name(), make_graph_node(entity)));
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp", "Added node /%s to an existing namespace %s in the graph.",
entity.node_name().c_str(),
entity.node_namespace().c_str());
return;
// Case 1: First time a node with this name is added to the namespace.
// Case 2: There are one or more nodes with the same name but the entity could
// represent a node with the same name but a unique id which would make it a
// new addition to the graph.
std::pair<NodeMap::iterator, NodeMap::iterator> range = ns_it->second.equal_range(
entity.node_name());
NodeMap::iterator node_it = std::find_if(
range.first, range.second,
[&entity](const std::pair<std::string, GraphNodePtr> & node_it)
{
return entity.id() == node_it.second->id_;
});
if (node_it == range.second) {
// Either the first time a node with this name is added or with an existing
// name but unique id.
NodeMap::iterator insertion_it =
ns_it->second.insert(std::make_pair(entity.node_name(), make_graph_node(entity)));
if (insertion_it != ns_it->second.end()) {
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"Added a new node /%s with id %s to an existing namespace %s in the graph.",
entity.node_name().c_str(),
entity.id().c_str(),
entity.node_namespace().c_str());
return;
} else {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to add a new node /%s with id %s an existing namespace %s in the graph. Report this bug.",
entity.node_name().c_str(),
entity.id().c_str(),
entity.node_namespace().c_str());
return;
}
} else {
// The entity represents a node that already exists in the graph.
// Update topic info if required below.
}

// Handles additions to an existing node in the graph.
if (entity.type() == EntityType::Node) {
// The NN entity is implicitly handled above where we add the node.
// If control reaches here, then we received a duplicate entry for the same node.
// This could happen when we get() all the liveliness tokens when the node spins up and
// receive a MP token before an NN one.
// Creating a new node above would have also updated the graph with any topic info.
return;
}

if (!entity.topic_info().has_value()) {
// Likely an error with parsing the token.
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp", "Put token %s parsed without extracting topic_info.",
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp", "Put token %s parsed without extracting topic_info. Report this bug.",
keyexpr.c_str());
return;
}
Expand All @@ -228,11 +266,42 @@ void GraphCache::parse_del(const std::string & keyexpr)
}
const liveliness::Entity & entity = *valid_entity;

// Helper lambda to update graph_topics_.
auto update_graph_topics =
[&](const liveliness::TopicInfo topic_info, std::size_t pub_count,
std::size_t sub_count) -> void
{
GraphNode::TopicMap::iterator cache_topic_it =
graph_topics_.find(topic_info.name_);
if (cache_topic_it == graph_topics_.end()) {
// This should not happen.
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp", "topic_key %s not found in graph_topics_. Report this.",
topic_info.name_.c_str());
} else {
GraphNode::TopicDataMap::iterator cache_topic_data_it =
cache_topic_it->second.find(topic_info.type_);
if (cache_topic_data_it != cache_topic_it->second.end()) {
// Decrement the relevant counters. If both counters are 0 remove from cache.
cache_topic_data_it->second->stats_.pub_count_ -= pub_count;
cache_topic_data_it->second->stats_.sub_count_ -= sub_count;
if (cache_topic_data_it->second->stats_.pub_count_ == 0 &&
cache_topic_data_it->second->stats_.sub_count_ == 0)
{
cache_topic_it->second.erase(cache_topic_data_it);
}
// If the topic does not have any TopicData entries, erase the topic from the map.
if (cache_topic_it->second.empty()) {
graph_topics_.erase(cache_topic_it);
}
}
}
};

// Helper lambda to append pub/subs to the GraphNode.
// We capture by reference to update caches like graph_topics_ if update_cache is true.
auto remove_topic_data =
[&](const Entity & entity, GraphNode & graph_node,
bool update_cache = false) -> void
[&](const Entity & entity, GraphNode & graph_node) -> void
{
if (entity.type() != EntityType::Publisher && entity.type() != EntityType::Subscription) {
return;
Expand Down Expand Up @@ -288,33 +357,7 @@ void GraphCache::parse_del(const std::string & keyexpr)
}

// Bookkeeping: Update graph_topic_ which keeps track of topics across all nodes in the graph.
if (update_cache) {
GraphNode::TopicMap::iterator cache_topic_it =
graph_topics_.find(topic_info.name_);
if (cache_topic_it == graph_topics_.end()) {
// This should not happen.
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp", "topic_key %s not found in graph_topics_. Report this.",
topic_info.name_.c_str());
} else {
GraphNode::TopicDataMap::iterator cache_topic_data_it =
cache_topic_it->second.find(topic_info.type_);
if (cache_topic_data_it != cache_topic_it->second.end()) {
// Decrement the relevant counters. If both counters are 0 remove from cache.
cache_topic_data_it->second->stats_.pub_count_ -= pub_count;
cache_topic_data_it->second->stats_.sub_count_ -= sub_count;
if (cache_topic_data_it->second->stats_.pub_count_ == 0 &&
cache_topic_data_it->second->stats_.sub_count_ == 0)
{
cache_topic_it->second.erase(cache_topic_data_it);
}
// If the topic does not have any TopicData entries, erase the topic from the map.
if (cache_topic_it->second.empty()) {
graph_topics_.erase(cache_topic_it);
}
}
}
}
update_graph_topics(topic_info, pub_count, sub_count);

RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
Expand All @@ -336,8 +379,22 @@ void GraphCache::parse_del(const std::string & keyexpr)
}

// If the node does not exist, ignore the request.
NodeMap::iterator node_it = ns_it->second.find(entity.node_name());
if (node_it == ns_it->second.end()) {
std::pair<NodeMap::iterator, NodeMap::iterator> range = ns_it->second.equal_range(
entity.node_name());
NodeMap::iterator node_it = std::find_if(
range.first, range.second,
[&entity](const std::pair<std::string, GraphNodePtr> & node_it)
{
// An operator== overload is defined above.
return entity.id() == node_it.second->id_;
});
if (node_it == range.second) {
// Node does not exist.
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"Received liveliness token to remove unknown node /%s from the graph. Ignoring...",
entity.node_name().c_str()
);
return;
}

Expand All @@ -351,13 +408,25 @@ void GraphCache::parse_del(const std::string & keyexpr)
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"Received liveliness token to remove node /%s from the graph before all pub/subs for this "
"node have been removed. Report this issue.",
"node have been removed. Removing all pub/subs first...",
entity.node_name().c_str()
);
// TODO(Yadunund): Iterate through the nodes pubs_ and subs_ and decrement topic count in
// graph_topics_.
auto remove_topics =
[&](const GraphNode::TopicMap & topic_map, const EntityType & entity_type) -> void {
std::size_t pub_count = entity_type == EntityType::Publisher ? 1 : 0;
std::size_t sub_count = !pub_count;
for (auto topic_it = topic_map.begin(); topic_it != topic_map.end(); ++topic_it) {
for (auto type_it = topic_it->second.begin(); type_it != topic_it->second.end();
++type_it)
{
update_graph_topics(type_it->second->info_, pub_count, sub_count);
}
}
};
remove_topics(graph_node->pubs_, EntityType::Publisher);
remove_topics(graph_node->subs_, EntityType::Subscription);
}
ns_it->second.erase(entity.node_name());
ns_it->second.erase(node_it);
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"Removed node /%s from the graph.",
Expand All @@ -368,14 +437,14 @@ void GraphCache::parse_del(const std::string & keyexpr)

if (!entity.topic_info().has_value()) {
// Likely an error with parsing the token.
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp", "Del token %s parsed without extracting TopicData.",
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp", "Del token %s parsed without extracting TopicData. Report this bug.",
keyexpr.c_str());
return;
}

// Update the graph based on the entity.
remove_topic_data(entity, *(node_it->second), true);
remove_topic_data(entity, *(node_it->second));
}

///=============================================================================
Expand Down Expand Up @@ -667,6 +736,8 @@ rmw_ret_t GraphCache::get_entity_names_and_types_by_node(
}

// Check if node exists.
// Since NodeMap is a multimap, this will return the first node with the same
// name that is found.
NodeMap::const_iterator node_it = ns_it->second.find(node_name);
if (node_it == ns_it->second.end()) {
return RMW_RET_OK;
Expand Down
13 changes: 12 additions & 1 deletion rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ using TopicDataPtr = std::shared_ptr<TopicData>;
// TODO(Yadunund): Expand to services and clients.
struct GraphNode
{
std::string id_;
std::string ns_;
std::string name_;
// TODO(Yadunund): Should enclave be the parent to the namespace key and not within a Node?
Expand All @@ -71,6 +72,9 @@ struct GraphNode
TopicMap subs_ = {};
};
using GraphNodePtr = std::shared_ptr<GraphNode>;
// Two nodes are the same if their id, name and namespaces are identical.
// TODO(Yadunund): Once zenoh API gives us globally unique IDs for entities, rely on only the id.
// bool operator==(const GraphNodePtr & lhs, const GraphNodePtr & rhs);

///=============================================================================
class GraphCache final
Expand Down Expand Up @@ -131,7 +135,14 @@ class GraphCache final
node_n:
*/

using NodeMap = std::unordered_map<std::string, GraphNodePtr>;
// // A comparator for sorting nodes that have the same name based on their
// // zenoh session ids.
// struct NodeComparator {
// bool operator()(const GraphNodePtr & lhs, const GraphNodePtr & rhs) const;
// };

// We rely on a multimap to store nodes with duplicate names.
using NodeMap = std::multimap<std::string, GraphNodePtr>;
using NamespaceMap = std::unordered_map<std::string, NodeMap>;
// Map namespace to a map of <node_name, GraphNodePtr>.
NamespaceMap graph_ = {};
Expand Down
19 changes: 14 additions & 5 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,15 @@ static const std::unordered_map<std::string, EntityType> str_to_entity = {
{CLI_STR, EntityType::Client}
};

std::string convert(z_id_t id) {
std::string convert(z_id_t id)
{
std::stringstream ss;
ss << std::hex;
size_t i = 0;
for (; i < (sizeof(id.id) - 1); i++) {
ss << static_cast<int>(id.id[i]) << ".";
}
ss << static_cast<int>(id.id[i]) << std::dec;
ss << static_cast<int>(id.id[i]);
return ss.str();
}

Expand All @@ -109,7 +110,8 @@ Entity::Entity(
EntityType type,
NodeInfo node_info,
std::optional<TopicInfo> topic_info)
: type_(std::move(type)),
: id_(std::move(id)),
type_(std::move(type)),
node_info_(std::move(node_info)),
topic_info_(std::move(topic_info))
{
Expand All @@ -132,7 +134,8 @@ Entity::Entity(
*/
std::stringstream token_ss;
const std::string & ns = node_info_.ns_;
token_ss << ADMIN_SPACE << "/" << node_info_.domain_id_ << "/" << id << "/" << entity_to_str.at(type_) << ns;
token_ss << ADMIN_SPACE << "/" << node_info_.domain_id_ << "/" << id_ << "/" << entity_to_str.at(
type_) << ns;
// An empty namespace from rcl will contain "/" but zenoh does not allow keys with "//".
// Hence we add an "_" to denote an empty namespace such that splitting the key
// will always result in 5 parts.
Expand Down Expand Up @@ -254,7 +257,7 @@ std::optional<Entity> Entity::make(const std::string & keyexpr)

EntityType entity_type = entity_it->second;
std::size_t domain_id = std::stoul(parts[1]);
std::string& id = parts[2];
std::string & id = parts[2];
std::string ns = parts[4] == "_" ? "/" : "/" + std::move(parts[4]);
std::string node_name = std::move(parts[5]);
std::optional<TopicInfo> topic_info = std::nullopt;
Expand All @@ -280,6 +283,12 @@ std::optional<Entity> Entity::make(const std::string & keyexpr)
std::move(topic_info)};
}

///=============================================================================
std::string Entity::id() const
{
return this->id_;
}

///=============================================================================
EntityType Entity::type() const
{
Expand Down
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class Entity
NodeInfo node_info,
std::optional<TopicInfo> topic_info);

std::string id_;
EntityType type_;
NodeInfo node_info_;
std::optional<TopicInfo> topic_info_;
Expand Down

0 comments on commit d6ef8ac

Please sign in to comment.