Skip to content

Commit

Permalink
Cleanup some of the code in the liveliness.
Browse files Browse the repository at this point in the history
Also fix a couple of bugs where we were mixing up subscriptions
and publications, leading to incorrect counts.

Signed-off-by: Chris Lalancette <[email protected]>
  • Loading branch information
clalancette committed Nov 27, 2023
1 parent 7f6fa1f commit 7c79ff1
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 47 deletions.
67 changes: 30 additions & 37 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ TopicStats::TopicStats(std::size_t pub_count, std::size_t sub_count)

///=============================================================================
TopicData::TopicData(
TopicInfo info,
liveliness::TopicInfo info,
TopicStats stats)
: info_(std::move(info)),
stats_(std::move(stats))
Expand All @@ -63,10 +63,9 @@ void GraphCache::parse_put(const std::string & keyexpr)
const auto & entity = *valid_entity;

// Helper lambda to append pub/subs to the GraphNode.
// We capture by reference to update caches like graph_topics_ if update_cache is true.
// We capture by reference to update graph_topics_
auto add_topic_data =
[&](const Entity & entity, GraphNode & graph_node,
bool update_cache = false) -> void
[this](const Entity & entity, GraphNode & graph_node) -> void
{
if (entity.type() != EntityType::Publisher && entity.type() != EntityType::Subscription) {
return;
Expand Down Expand Up @@ -97,37 +96,35 @@ void GraphCache::parse_put(const std::string & keyexpr)
// the counters.
auto & existing_graph_topic = type_insertion.first->second;
existing_graph_topic->stats_.pub_count_ += pub_count;
existing_graph_topic->stats_.pub_count_ += sub_count;
existing_graph_topic->stats_.sub_count_ += sub_count;
}
}

// Bookkeeping: Update graph_topic_ which keeps track of topics across all nodes in the graph.
if (update_cache) {
auto cache_topic_it = graph_topics_.find(entity.topic_info()->name_);
if (cache_topic_it == graph_topics_.end()) {
// First time this topic name is added to the graph.
auto topic_data_ptr = std::make_shared<TopicData>(
// Bookkeeping: Update graph_topics_ which keeps track of topics across all nodes in the graph
auto cache_topic_it = graph_topics_.find(entity.topic_info()->name_);
if (cache_topic_it == graph_topics_.end()) {
// First time this topic name is added to the graph.
auto topic_data_ptr = std::make_shared<TopicData>(
entity.topic_info().value(),
TopicStats{pub_count, sub_count}
);
graph_topics_[entity.topic_info()->name_] = GraphNode::TopicDataMap{
{entity.topic_info()->type_, topic_data_ptr}
};
} else {
// If a TopicData entry for the same type exists in the topic map, update pub/sub counts
// or else create an new TopicData.
auto topic_data_insertion =
cache_topic_it->second.insert(std::make_pair(entity.topic_info()->type_, nullptr));
if (topic_data_insertion.second) {
// A TopicData for the topic_type does not exist.
topic_data_insertion.first->second = std::make_shared<TopicData>(
entity.topic_info().value(),
TopicStats{pub_count, sub_count}
);
graph_topics_[entity.topic_info()->name_] = GraphNode::TopicDataMap{
{entity.topic_info()->type_, topic_data_ptr}
};
TopicStats{pub_count, sub_count});
} else {
// If a TopicData entry for the same type exists in the topic map, update pub/sub counts
// or else create an new TopicData.
auto topic_data_insertion =
cache_topic_it->second.insert(std::make_pair(entity.topic_info()->type_, nullptr));
if (topic_data_insertion.second) {
// A TopicData for the topic_type does not exist.
topic_data_insertion.first->second = std::make_shared<TopicData>(
entity.topic_info().value(),
TopicStats{pub_count, sub_count});
} else {
// Update the existing counters.
topic_data_insertion.first->second->stats_.pub_count_ += pub_count;
topic_data_insertion.first->second->stats_.sub_count_ += sub_count;
}
// Update the existing counters.
topic_data_insertion.first->second->stats_.pub_count_ += pub_count;
topic_data_insertion.first->second->stats_.sub_count_ += sub_count;
}
}

Expand All @@ -141,7 +138,7 @@ void GraphCache::parse_put(const std::string & keyexpr)
graph_node.name_.c_str());
};

// Helper lambdas to convert an Entity into a GraphNode.
// Helper lambda to convert an Entity into a GraphNode.
auto make_graph_node =
[&](const Entity & entity) -> std::shared_ptr<GraphNode>
{
Expand All @@ -155,7 +152,7 @@ void GraphCache::parse_put(const std::string & keyexpr)
return graph_node;
}
// Add pub/sub entries.
add_topic_data(entity, *graph_node, true);
add_topic_data(entity, *graph_node);

return graph_node;
};
Expand Down Expand Up @@ -205,7 +202,7 @@ void GraphCache::parse_put(const std::string & keyexpr)
}

// Update the graph based on the entity.
add_topic_data(entity, *(node_it->second), true);
add_topic_data(entity, *(node_it->second));
}

///=============================================================================
Expand Down Expand Up @@ -558,11 +555,9 @@ rmw_ret_t GraphCache::get_topic_names_and_types(

///=============================================================================
rmw_ret_t GraphCache::count_publishers(
const rmw_node_t * node,
const char * topic_name,
size_t * count) const
{
static_cast<void>(node);
*count = 0;
auto topic_it = graph_topics_.find(topic_name);
if (topic_it == graph_topics_.end()) {
Expand All @@ -578,11 +573,9 @@ rmw_ret_t GraphCache::count_publishers(

///=============================================================================
rmw_ret_t GraphCache::count_subscriptions(
const rmw_node_t * node,
const char * topic_name,
size_t * count) const
{
static_cast<void>(node);
*count = 0;
auto topic_it = graph_topics_.find(topic_name);
if (topic_it == graph_topics_.end()) {
Expand Down
7 changes: 2 additions & 5 deletions rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,15 @@ struct TopicStats
// Constructor which initializes counters to 0.
TopicStats(std::size_t pub_count, std::size_t sub_count);
};
using TopicInfo = liveliness::TopicInfo;

///=============================================================================
struct TopicData
{
TopicInfo info_;
liveliness::TopicInfo info_;
TopicStats stats_;

TopicData(
TopicInfo info,
liveliness::TopicInfo info,
TopicStats stats);
};
using TopicDataPtr = std::shared_ptr<TopicData>;
Expand Down Expand Up @@ -94,12 +93,10 @@ class GraphCache final
rmw_names_and_types_t * topic_names_and_types) const;

rmw_ret_t count_publishers(
const rmw_node_t * node,
const char * topic_name,
size_t * count) const;

rmw_ret_t count_subscriptions(
const rmw_node_t * node,
const char * topic_name,
size_t * count) const;

Expand Down
8 changes: 3 additions & 5 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1270,7 +1270,7 @@ rmw_create_subscription(

// Publish to the graph that a new subscription is in town
const auto liveliness_entity = liveliness::Entity::make(
liveliness::EntityType::Publisher,
liveliness::EntityType::Subscription,
liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""},
liveliness::TopicInfo{rmw_subscription->topic_name,
sub_data->type_support->get_name(), "reliable"}
Expand Down Expand Up @@ -2173,8 +2173,7 @@ rmw_count_publishers(
}
RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT);

return node->context->impl->graph_cache.count_publishers(
node, topic_name, count);
return node->context->impl->graph_cache.count_publishers(topic_name, count);
}

//==============================================================================
Expand Down Expand Up @@ -2204,8 +2203,7 @@ rmw_count_subscribers(
}
RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT);

return node->context->impl->graph_cache.count_subscriptions(
node, topic_name, count);
return node->context->impl->graph_cache.count_subscriptions(topic_name, count);
}

//==============================================================================
Expand Down

0 comments on commit 7c79ff1

Please sign in to comment.