Skip to content

Commit

Permalink
Fix types and type mangling.
Browse files Browse the repository at this point in the history
When playing around with this I found that the types
weren't being properly filled in, and also weren't being
demangled.  Fix both of those things here, as well
as do a lot of cleanup.

Signed-off-by: Chris Lalancette <[email protected]>
  • Loading branch information
clalancette committed Nov 17, 2023
1 parent 428cace commit f7711c7
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 134 deletions.
146 changes: 75 additions & 71 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <utility>
#include <vector>

#include "rcpputils/find_and_replace.hpp"
#include "rcpputils/scope_exit.hpp"

#include "rcutils/logging_macros.h"
Expand All @@ -38,6 +39,19 @@ std::string GenerateToken::liveliness(size_t domain_id)
}

///=============================================================================
/**
* Generate a liveliness token for the particular entity.
*
* The liveliness tokens are in the form:
*
* @ros2_lv/<domainid>/<entity>/<namespace>/<nodename>
*
* Where:
* <domainid> - A number set by the user to "partition" graphs. Roughly equivalent to the domain ID in DDS.
* <entity> - The type of entity. This can be one of "NN" for a node, "MP" for a publisher, "MS" for a subscription, "SS" for a service server, or "SC" for a service client.
* <namespace> - The ROS namespace for this entity. If the namespace is absolute, this function will add in an _ for later parsing reasons.
* <nodename> - The ROS node name for this entity.
*/
static std::string generate_base_token(
const std::string & entity,
size_t domain_id,
Expand All @@ -46,7 +60,7 @@ static std::string generate_base_token(
{
std::stringstream token_ss;
token_ss << "@ros2_lv/" << domain_id << "/" << entity << namespace_;
// An empty namespace from rcl will contain "/"" but zenoh does not allow keys with "//".
// An empty namespace from rcl will contain "/" but zenoh does not allow keys with "//".
// Hence we add an "_" to denote an empty namespace such that splitting the key
// will always result in 5 parts.
if (namespace_ == "/") {
Expand Down Expand Up @@ -79,7 +93,6 @@ std::string GenerateToken::publisher(
{
std::string token = generate_base_token("MP", domain_id, node_namespace, node_name);
token += topic + "/" + type + "/" + qos;
printf("GenerateToken::Publisher: %s\n", token.c_str());
return token;
}

Expand All @@ -94,7 +107,6 @@ std::string GenerateToken::subscription(
{
std::string token = generate_base_token("MS", domain_id, node_namespace, node_name);
token += topic + "/" + type + "/" + qos;
printf("GenerateToken::Subscription: %s\n", token.c_str());
return token;
}

Expand Down Expand Up @@ -213,23 +225,12 @@ std::optional<std::pair<std::string, GraphNode>> _parse_token(const std::string
}
}

// Get the entity, ie N, MP, MS, SS, SC.
// Get the entity, ie NN, MP, MS, SS, SC.
std::string & entity = parts[2];
if (entity != "NN" &&
entity != "MP" &&
entity != "MS" &&
entity != "SS" &&
entity != "SC")
{
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Invalid entity [%s] in liveliness token", entity.c_str());
return std::nullopt;
}

GraphNode node;
// Nodes with empty namespaces will contain a "_".
node.ns = parts.at(3) == "_" ? "/" : "/" + parts.at(3);
node.ns = parts[3] == "_" ? "/" : "/" + parts[3];
node.name = std::move(parts[4]);

if (entity != "NN") {
Expand All @@ -248,14 +249,21 @@ std::optional<std::pair<std::string, GraphNode>> _parse_token(const std::string
node.pubs.push_back(std::move(data));
} else if (entity == "MS") {
node.subs.push_back(std::move(data));
} else if (entity == "SS") {
// TODO(yadunund): Service server
} else if (entity == "SC") {
// TODO(yadunund): Service client
} else {
// TODO.
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Invalid entity [%s] in liveliness token", entity.c_str());
return std::nullopt;
}
}

return std::make_pair(std::move(entity), std::move(node));
}
} // namespace anonymous
} // namespace

///=============================================================================
GraphCache::TopicStats::TopicStats(std::size_t pub_count, std::size_t sub_count)
Expand All @@ -282,11 +290,10 @@ void GraphCache::parse_put(const std::string & keyexpr)
auto ns_it = graph_.find(node->ns);
if (ns_it == graph_.end()) {
// New namespace.
std::string ns = node->ns;
std::unordered_map<std::string, GraphNodePtr> map = {
{node->name, node}
};
graph_.insert(std::make_pair(std::move(ns), std::move(map)));
graph_.insert(std::make_pair(std::move(node->ns), std::move(map)));
} else {
auto insertion = ns_it->second.insert(std::make_pair(node->name, node));
if (!insertion.second) {
Expand Down Expand Up @@ -371,9 +378,9 @@ void GraphCache::parse_put(const std::string & keyexpr)
node->name.c_str());
return;
} else if (entity == "SS") {
// Service
// TODO(yadunund): Service server
} else if (entity == "SC") {
// Client
// TODO(yadunud): Service Client
} else {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
Expand Down Expand Up @@ -501,9 +508,9 @@ void GraphCache::parse_del(const std::string & keyexpr)
}
}
} else if (entity == "SS") {
// Service
// TODO(yadunund): Service server
} else if (entity == "SC") {
// Client
// TODO(yadunund): Service client
} else {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
Expand Down Expand Up @@ -539,10 +546,6 @@ rmw_ret_t GraphCache::get_node_names(
for (auto it = graph_.begin(); it != graph_.end(); ++it) {
nodes_number += it->second.size();
}
// TODO(Yadunund): Delete.
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"nodes_number %ld", nodes_number);

rcutils_ret_t rcutils_ret =
rcutils_string_array_init(node_names, nodes_number, allocator);
Expand Down Expand Up @@ -630,6 +633,33 @@ rmw_ret_t GraphCache::get_node_names(
return RMW_RET_OK;
}

namespace
{
// Shamelessly copied from https://github.com/ros2/rmw_cyclonedds/blob/f7f67bdef82f59558366aa6ce94ef9af3c5ab569/rmw_cyclonedds_cpp/src/demangle.cpp#L67
std::string
_demangle_if_ros_type(const std::string & dds_type_string)
{
if (dds_type_string[dds_type_string.size() - 1] != '_') {
// not a ROS type
return dds_type_string;
}

std::string substring = "dds_::";
size_t substring_position = dds_type_string.find(substring);
if (substring_position == std::string::npos) {
// not a ROS type
return dds_type_string;
}

std::string type_namespace = dds_type_string.substr(0, substring_position);
type_namespace = rcpputils::find_and_replace(type_namespace, "::", "/");
size_t start = substring_position + substring.size();
std::string type_name = dds_type_string.substr(start, dds_type_string.length() - 1 - start);
return type_namespace + type_name;
}

} // namespace

///=============================================================================
rmw_ret_t GraphCache::get_topic_names_and_types(
rcutils_allocator_t * allocator,
Expand All @@ -642,10 +672,6 @@ rmw_ret_t GraphCache::get_topic_names_and_types(

std::lock_guard<std::mutex> lock(graph_mutex_);
const size_t topic_number = graph_topics_.size();
// TODO(Yadunund): Delete.
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"topic_number %ld", topic_number);

rmw_ret_t ret = rmw_names_and_types_init(topic_names_and_types, topic_number, allocator);
if (ret != RMW_RET_OK) {
Expand All @@ -659,66 +685,44 @@ rmw_ret_t GraphCache::get_topic_names_and_types(
}
});

// topic_names_and_types->names is an rcutils_string_array_t,
// while topic_names_and_types->types is an rcutils_string_array_t *

// rcutils_ret_t rcutils_ret =
// rcutils_string_array_init(topic_names_and_types->names, topic_number, allocator);
// if (rcutils_ret != RCUTILS_RET_OK) {
// return RMW_RET_BAD_ALLOC;
// }
auto free_topic_names = rcpputils::make_scope_exit(
[names = &topic_names_and_types->names]() {
rcutils_ret_t ret = rcutils_string_array_fini(names);
if (ret != RCUTILS_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"failed to cleanup during error handling: %s", rcutils_get_error_string().str);
}
});

rcutils_ret_t rcutils_ret =
rcutils_string_array_init(topic_names_and_types->types, topic_number, allocator);
if (rcutils_ret != RCUTILS_RET_OK) {
return RMW_RET_BAD_ALLOC;
}
auto free_topic_types = rcpputils::make_scope_exit(
[types = topic_names_and_types->types]() {
rcutils_ret_t ret = rcutils_string_array_fini(types);
if (ret != RCUTILS_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"failed to cleanup during error handling: %s", rcutils_get_error_string().str);
}
});

// Fill topic names and types.
std::size_t j = 0;
for (const auto & it : graph_topics_) {
// Split based on "?".
// TODO(Yadunund): Be more systematic about this.
// TODO(clalancette): Rather than doing the splitting here, should we store
// it in graph_topics_ already split?
std::vector<std::string> parts = split_keyexpr(it.first, '?');
if (parts.size() < 2) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Invalid topic_key %s", it.first.c_str());
return RMW_RET_INVALID_ARGUMENT;
}
topic_names_and_types->names.data[j] = rcutils_strdup(parts.at(0).c_str(), *allocator);

topic_names_and_types->names.data[j] = rcutils_strdup(parts[0].c_str(), *allocator);
if (!topic_names_and_types->names.data[j]) {
return RMW_RET_BAD_ALLOC;
}
topic_names_and_types->types->data[j] = rcutils_strdup(
parts.at(1).c_str(), *allocator);
if (!topic_names_and_types->types->data[j]) {

// TODO(clalancette): This won't work if there are multiple types on the same topic
rcutils_ret_t rcutils_ret = rcutils_string_array_init(
&topic_names_and_types->types[j], 1, allocator);
if (RCUTILS_RET_OK != rcutils_ret) {
RMW_SET_ERROR_MSG(rcutils_get_error_string().str);
return RMW_RET_BAD_ALLOC;
}

topic_names_and_types->types[j].data[0] = rcutils_strdup(
_demangle_if_ros_type(parts[1]).c_str(), *allocator);
if (!topic_names_and_types->types[j].data[0]) {
return RMW_RET_BAD_ALLOC;
}

++j;
}

cleanup_names_and_types.cancel();
free_topic_names.cancel();
free_topic_types.cancel();

return RMW_RET_OK;
}
1 change: 0 additions & 1 deletion rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ class PublishToken
// TODO(Yadunund): Expand to services and clients.
struct GraphNode
{

struct TopicData
{
std::string topic;
Expand Down
57 changes: 0 additions & 57 deletions rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,62 +41,5 @@ rmw_get_topic_names_and_types(

return node->context->impl->graph_cache.get_topic_names_and_types(
allocator, no_demangle, topic_names_and_types);

// rmw_ret_t ret = rmw_names_and_types_init(topic_names_and_types, 1, allocator);
// if (ret != RMW_RET_OK) {
// return ret;
// }
// auto cleanup_names_and_types = rcpputils::make_scope_exit(
// [topic_names_and_types] {
// rmw_ret_t fail_ret = rmw_names_and_types_fini(topic_names_and_types);
// if (fail_ret != RMW_RET_OK) {
// RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup names and types during error handling");
// }
// });

// // topic_names_and_types->names is an rcutils_string_array_t,
// // while topic_names_and_types->types is an rcutils_string_array_t *

// topic_names_and_types->names.data[0] = rcutils_strdup("/chatter", *allocator);
// if (topic_names_and_types->names.data[0] == nullptr) {
// RMW_SET_ERROR_MSG("failed to allocate memory for topic names");
// return RMW_RET_BAD_ALLOC;
// }
// auto free_names = rcpputils::make_scope_exit(
// [topic_names_and_types, allocator] {
// allocator->deallocate(topic_names_and_types->names.data[0], allocator->state);
// });

// rcutils_ret_t rcutils_ret = rcutils_string_array_init(
// topic_names_and_types->types, 1,
// allocator);
// if (rcutils_ret != RCUTILS_RET_OK) {
// RMW_SET_ERROR_MSG("failed to allocate memory for topic types");
// return RMW_RET_BAD_ALLOC;
// }
// auto fini_string_array = rcpputils::make_scope_exit(
// [topic_names_and_types] {
// rmw_ret_t fail_ret = rcutils_string_array_fini(topic_names_and_types->types);
// if (fail_ret != RMW_RET_OK) {
// RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup topic types during error handling");
// }
// });

// topic_names_and_types->types[0].data[0] = rcutils_strdup("std_msgs/msg/String", *allocator);
// if (topic_names_and_types->types[0].data[0] == nullptr) {
// RMW_SET_ERROR_MSG("failed to allocate memory for topic data");
// return RMW_RET_BAD_ALLOC;
// }
// auto free_types = rcpputils::make_scope_exit(
// [topic_names_and_types, allocator] {
// allocator->deallocate(topic_names_and_types->types[0].data[0], allocator->state);
// });

// free_types.cancel();
// fini_string_array.cancel();
// free_names.cancel();
// cleanup_names_and_types.cancel();

return RMW_RET_OK;
}
} // extern "C"
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/src/rmw_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
// Setup liveliness subscriptions for discovery.
const std::string liveliness_str = GenerateToken::liveliness(context->actual_domain_id);

// Query the router/liveliness participants to get graph information before this session was started.
// Query router/liveliness participants to get graph information before this session was started.
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"Sending Query '%s' to fetch discovery data...",
Expand Down
8 changes: 4 additions & 4 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
#include <fastcdr/FastBuffer.h>
#include <fastcdr/Cdr.h>

#include <zenoh.h>

#include <chrono>
#include <mutex>
#include <new>
#include <sstream>

#include <zenoh.h>

#include "detail/guard_condition.hpp"
#include "detail/graph_cache.hpp"
#include "detail/identifier.hpp"
Expand Down Expand Up @@ -247,7 +247,8 @@ rmw_destroy_node(rmw_node_t * node)
// return RMW_RET_ERROR;
// }

// Undeclare liveliness token for the node to advertise that the node has ridden off into the sunset.
// Undeclare liveliness token for the node to advertise that the node has ridden
// off into the sunset.
rmw_node_data_t * node_data = static_cast<rmw_node_data_t *>(node->data);
zc_liveliness_undeclare_token(z_move(node_data->token));

Expand Down Expand Up @@ -1313,7 +1314,6 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)

auto sub_data = static_cast<rmw_subscription_data_t *>(subscription->data);
if (sub_data != nullptr) {

// Publish to the graph that a subscription has ridden off into the sunset
zc_liveliness_undeclare_token(z_move(sub_data->token));

Expand Down

0 comments on commit f7711c7

Please sign in to comment.