diff --git a/README.md b/README.md index 7203c35c..28814060 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ Build `rmw_zenoh_cpp` ```bash mkdir ~/ws_rmw_zenoh/src -p && cd ~/ws_rmw_zenoh/src -git clone git@github.com:ros2/rmw_zenoh.git +git clone https://github.com/ros2/rmw_zenoh.git cd ~/ws_rmw_zenoh source /opt/ros//setup.bash # replace with ROS 2 distro of choice colcon build --cmake-args -DCMAKE_BUILD_TYPE=Release @@ -34,25 +34,42 @@ colcon build --cmake-args -DCMAKE_BUILD_TYPE=Release ## Test -Source workspace +Make sure to source the built workspace using the commands below prior to running any other commands. ```bash cd ~/ws_rmw_zenoh source install/setup.bash ``` -In a terminal launch Zenoh router: +### Start the zenoh router +> Note: Manually launching zenoh router won't be necessary in the future. ```bash +# terminal 1 ros2 run rmw_zenoh_cpp init_rmw_zenoh_router ``` -> Note: Manually launching zenoh router won't be necessary in the future. -In a different terminal source install folder and execute: +> Note: Without the zenoh router, nodes will not be able to discover each other since multicast discovery is disabled by default in the node's session config. Instead, nodes will receive discovery information about other peers via the zenoh router's gossip functionality. See more information on the session configs [below](#config). + +### Run the `talker` +```bash +# terminal 2 +export RMW_IMPLEMENTATION=rmw_zenoh_cpp +ros2 run demo_nodes_cpp talker +``` +> Note: Ignore all the warning printouts. +### Run the `listener` ```bash +# terminal 2 export RMW_IMPLEMENTATION=rmw_zenoh_cpp -ros2 topic pub "/chatter" std_msgs/msg/String '{data: hello}' +ros2 run demo_nodes_cpp listener ``` +The listener node should start receiving messages over the `/chatter` topic. +> Note: Ignore all the warning printouts. + +### Graph introspection +Presently we only support limited `ros2cli` commands to introspect the ROS graph such as `ros2 node list` and `ros2 topic list`. + ## Config The [default configuration](rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5) sets up the zenoh sessions with the following main characteristics: @@ -69,7 +86,7 @@ A custom configuration may be provided by setting the `RMW_ZENOH_CONFIG_FILE` en ## TODO Features - [x] Publisher -- [ ] Subscription +- [x] Subscription - [ ] Client - [ ] Service - [ ] Graph introspection diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index f403c364..d3f93e72 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -21,7 +21,6 @@ find_package(rcutils REQUIRED) find_package(rosidl_typesupport_fastrtps_c REQUIRED) find_package(rosidl_typesupport_fastrtps_cpp REQUIRED) find_package(rmw REQUIRED) -find_package(yaml_cpp_vendor REQUIRED) find_package(zenoh_c_vendor REQUIRED) find_package(zenohc REQUIRED) @@ -57,7 +56,6 @@ target_link_libraries(rmw_zenoh_cpp rosidl_typesupport_fastrtps_c::rosidl_typesupport_fastrtps_c rosidl_typesupport_fastrtps_cpp::rosidl_typesupport_fastrtps_cpp rmw::rmw - yaml-cpp zenohc::lib ) diff --git a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 index 3fa61761..506822d9 100644 --- a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 +++ b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 @@ -123,7 +123,7 @@ /// This option does not make LowLatency transport mandatory, the actual implementation of transport /// used will depend on Establish procedure and other party's settings /// - /// NOTE: Currently, the LowLatency transport doesn't preserve QoS prioritization. + /// NOTE: Currently, the LowLatency transport doesn't preserve QoS prioritization. /// NOTE: Due to the note above, 'lowlatency' is incompatible with 'qos' option, so in order to /// enable 'lowlatency' you need to explicitly disable 'qos'. lowlatency: false, @@ -258,4 +258,5 @@ write: false, }, }, + } diff --git a/rmw_zenoh_cpp/package.xml b/rmw_zenoh_cpp/package.xml index 259bbce6..02abec46 100644 --- a/rmw_zenoh_cpp/package.xml +++ b/rmw_zenoh_cpp/package.xml @@ -19,7 +19,6 @@ rosidl_typesupport_fastrtps_c rosidl_typesupport_fastrtps_cpp rmw - yaml_cpp_vendor ament_lint_auto ament_lint_common diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index a890f485..025b0047 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -14,11 +14,13 @@ #include #include +#include #include #include #include #include +#include "rcpputils/find_and_replace.hpp" #include "rcpputils/scope_exit.hpp" #include "rcutils/logging_macros.h" @@ -37,6 +39,19 @@ std::string GenerateToken::liveliness(size_t domain_id) } ///============================================================================= +/** + * Generate a liveliness token for the particular entity. + * + * The liveliness tokens are in the form: + * + * @ros2_lv//// + * + * Where: + * - A number set by the user to "partition" graphs. Roughly equivalent to the domain ID in DDS. + * - The type of entity. This can be one of "NN" for a node, "MP" for a publisher, "MS" for a subscription, "SS" for a service server, or "SC" for a service client. + * - The ROS namespace for this entity. If the namespace is absolute, this function will add in an _ for later parsing reasons. + * - The ROS node name for this entity. + */ static std::string generate_base_token( const std::string & entity, size_t domain_id, @@ -44,8 +59,17 @@ static std::string generate_base_token( const std::string & name) { std::stringstream token_ss; - // TODO(Yadunund): Empty namespace will contain /. Fix non-empty namespace. - token_ss << "@ros2_lv/" << domain_id << "/" << entity << namespace_ << name; + token_ss << "@ros2_lv/" << domain_id << "/" << entity << namespace_; + // An empty namespace from rcl will contain "/" but zenoh does not allow keys with "//". + // Hence we add an "_" to denote an empty namespace such that splitting the key + // will always result in 5 parts. + if (namespace_ == "/") { + token_ss << "_/"; + } else { + token_ss << "/"; + } + // Finally append node name. + token_ss << name; return token_ss.str(); } @@ -72,6 +96,20 @@ std::string GenerateToken::publisher( return token; } +///============================================================================= +std::string GenerateToken::subscription( + size_t domain_id, + const std::string & node_namespace, + const std::string & node_name, + const std::string & topic, + const std::string & type, + const std::string & qos) +{ + std::string token = generate_base_token("MS", domain_id, node_namespace, node_name); + token += topic + "/" + type + "/" + qos; + return token; +} + ///============================================================================= bool PublishToken::put( z_owned_session_t * session, @@ -134,123 +172,18 @@ bool PublishToken::del( } ///============================================================================= -PublisherData::PublisherData( - const char * topic, const char * node, const char * namespace_, - const char * type, rcutils_allocator_t * allocator) -: allocator_(allocator) -{ - // TODO(clalancette): Check for error - topic_name_ = rcutils_strdup(topic, *allocator); - - // TODO(clalancette): Check for error - node_name_ = rcutils_strdup(node, *allocator); - - // TODO(clalancette): Check for error - namespace_name_ = rcutils_strdup(namespace_, *allocator); - - // TODO(clalancette): Check for error - type_name_ = rcutils_strdup(type, *allocator); -} - -///============================================================================= -PublisherData::~PublisherData() -{ - allocator_->deallocate(topic_name_, allocator_->state); - allocator_->deallocate(node_name_, allocator_->state); - allocator_->deallocate(namespace_name_, allocator_->state); - allocator_->deallocate(type_name_, allocator_->state); -} - -///============================================================================= -SubscriptionData::SubscriptionData( - const char * topic, const char * node, const char * namespace_, - const char * type, rcutils_allocator_t * allocator) -: allocator_(allocator) -{ - // TODO(clalancette): Check for error - topic_name_ = rcutils_strdup(topic, *allocator); - - // TODO(clalancette): Check for error - node_name_ = rcutils_strdup(node, *allocator); - - // TODO(clalancette): Check for error - namespace_name_ = rcutils_strdup(namespace_, *allocator); - - // TODO(clalancette): Check for error - type_name_ = rcutils_strdup(type, *allocator); -} - -///============================================================================= -SubscriptionData::~SubscriptionData() -{ - allocator_->deallocate(topic_name_, allocator_->state); - allocator_->deallocate(node_name_, allocator_->state); - allocator_->deallocate(namespace_name_, allocator_->state); - allocator_->deallocate(type_name_, allocator_->state); -} - -///============================================================================= -uint64_t -GraphCache::add_publisher( - const char * topic, const char * node, const char * namespace_, - const char * type, rcutils_allocator_t * allocator) -{ - std::lock_guard lck(publishers_mutex_); - uint64_t this_handle_id = publishers_handle_id_++; - publishers_.emplace( - std::make_pair( - this_handle_id, std::make_unique(topic, node, namespace_, type, allocator))); - return this_handle_id; -} - -///============================================================================= -void -GraphCache::remove_publisher(uint64_t handle) -{ - std::lock_guard lck(publishers_mutex_); - if (publishers_.count(handle) == 0) { - return; - } - - publishers_.erase(handle); -} - -///============================================================================= -uint64_t -GraphCache::add_subscription( - const char * topic, const char * node, const char * namespace_, - const char * type, rcutils_allocator_t * allocator) -{ - std::lock_guard lck(subscriptions_mutex_); - uint64_t this_handle_id = subscriptions_handle_id_++; - subscriptions_.emplace( - std::make_pair( - this_handle_id, - std::make_unique(topic, node, namespace_, type, allocator))); - return this_handle_id; -} - -///============================================================================= -void -GraphCache::remove_subscription(uint64_t handle) +namespace { - std::lock_guard lck(subscriptions_mutex_); - if (subscriptions_.count(handle) == 0) { - return; - } - - subscriptions_.erase(handle); -} - -///============================================================================= -static std::vector split_keyexpr(const std::string & keyexpr) +std::vector split_keyexpr( + const std::string & keyexpr, + const char delim = '/') { std::vector delim_idx = {}; // Insert -1 for starting position to make the split easier when using substr. delim_idx.push_back(-1); std::size_t idx = 0; for (auto it = keyexpr.begin(); it != keyexpr.end(); ++it) { - if (*it == '/') { + if (*it == delim) { delim_idx.push_back(idx); } ++idx; @@ -270,37 +203,184 @@ static std::vector split_keyexpr(const std::string & keyexpr) result.push_back(keyexpr.substr(delim_idx.back() + 1)); return result; } - ///============================================================================= -void GraphCache::parse_put(const std::string & keyexpr) +// Convert a liveliness token into a +std::optional> _parse_token(const std::string & keyexpr) { - // TODO(Yadunund): Validate data. std::vector parts = split_keyexpr(keyexpr); - if (parts.size() < 3) { + // At minimum, a token will contain 5 parts (@ros2_lv, domain_id, entity, namespace, node_name). + // Basic validation. + if (parts.size() < 5) { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Received invalid liveliness token"); + return std::nullopt; + } + for (const std::string & p : parts) { + if (p.empty()) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Received invalid liveliness token"); + return std::nullopt; + } + } + + // Get the entity, ie NN, MP, MS, SS, SC. + std::string & entity = parts[2]; + + GraphNode node; + // Nodes with empty namespaces will contain a "_". + node.ns = parts[3] == "_" ? "/" : "/" + parts[3]; + node.name = std::move(parts[4]); + + if (entity != "NN") { + if (parts.size() < 8) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Received invalid liveliness token"); + return std::nullopt; + } + GraphNode::TopicData data; + data.topic = "/" + std::move(parts[5]); + data.type = std::move(parts[6]); + data.qos = std::move(parts[7]); + + if (entity == "MP") { + node.pubs.push_back(std::move(data)); + } else if (entity == "MS") { + node.subs.push_back(std::move(data)); + } else if (entity == "SS") { + // TODO(yadunund): Service server + } else if (entity == "SC") { + // TODO(yadunund): Service client + } else { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Invalid entity [%s] in liveliness token", entity.c_str()); + return std::nullopt; + } + } + + return std::make_pair(std::move(entity), std::move(node)); +} +} // namespace + +///============================================================================= +GraphCache::TopicStats::TopicStats(std::size_t pub_count, std::size_t sub_count) +: pub_count_(pub_count), + sub_count_(sub_count) +{ + // Do nothing. +} + +///============================================================================= +void GraphCache::parse_put(const std::string & keyexpr) +{ + auto valid_token = _parse_token(keyexpr); + if (!valid_token.has_value()) { + // Error message has already been logged. return; } - // Get the entity, ie N, MP, MS, SS, SC. - const std::string & entity = parts[2]; + const std::string & entity = valid_token->first; + auto node = std::make_shared(std::move(valid_token->second)); std::lock_guard lock(graph_mutex_); + if (entity == "NN") { // Node - RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "Adding node %s to the graph.", parts.back().c_str()); - const bool has_namespace = entity.size() == 5 ? true : false; - graph_[parts.back()] = YAML::Node(); - // TODO(Yadunund): Implement enclave support. - graph_[parts.back()]["enclave"] = ""; - graph_[parts.back()]["namespace"] = has_namespace ? parts.at(4) : "/"; + auto ns_it = graph_.find(node->ns); + if (ns_it == graph_.end()) { + // New namespace. + std::unordered_map map = { + {node->name, node} + }; + graph_.insert(std::make_pair(std::move(node->ns), std::move(map))); + } else { + auto insertion = ns_it->second.insert(std::make_pair(node->name, node)); + if (!insertion.second) { + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", "Unable to add duplicate node /%s to the graph.", + node->name.c_str()); + } + } + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", "Added node /%s to the graph.", + node->name.c_str()); + return; + } else if (entity == "MP") { // Publisher + auto ns_it = graph_.find(node->ns); + if (ns_it == graph_.end()) { + // Potential edge case where a liveliness update for a node creation was missed. + // So we add the node here. + std::string ns = node->ns; + std::unordered_map map = { + {node->name, node} + }; + graph_.insert(std::make_pair(std::move(ns), std::move(map))); + } else { + auto insertion = ns_it->second.insert(std::make_pair(node->name, node)); + if (!insertion.second && !node->pubs.empty()) { + // Node already exists so just append the publisher. + insertion.first->second->pubs.push_back(node->pubs[0]); + } else { + return; + } + } + // Bookkeeping + // TODO(Yadunund): Be more systematic about generating the key. + std::string topic_key = node->pubs.at(0).topic + "?" + node->pubs.at(0).type; + auto insertion = graph_topics_.insert(std::make_pair(std::move(topic_key), nullptr)); + if (!insertion.second) { + // Such a topic already exists so we just increment its count. + ++insertion.first->second->pub_count_; + } else { + insertion.first->second = std::make_unique(1, 0); + } + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", "Added publisher %s to node /%s in graph.", + node->pubs.at(0).topic.c_str(), + node->name.c_str()); + return; } else if (entity == "MS") { // Subscription + auto ns_it = graph_.find(node->ns); + if (ns_it == graph_.end()) { + // Potential edge case where a liveliness update for a node creation was missed. + // So we add the node here. + std::string ns = node->ns; + std::unordered_map map = { + {node->name, node} + }; + graph_.insert(std::make_pair(std::move(ns), std::move(map))); + } else { + auto insertion = ns_it->second.insert(std::make_pair(node->name, node)); + if (!insertion.second && !node->subs.empty()) { + // Node already exists so just append the publisher. + insertion.first->second->subs.push_back(node->subs[0]); + } else { + return; + } + } + // Bookkeeping + // TODO(Yadunund): Be more systematic about generating the key. + std::string topic_key = node->subs.at(0).topic + "?" + node->subs.at(0).type; + auto insertion = graph_topics_.insert(std::make_pair(std::move(topic_key), nullptr)); + if (!insertion.second) { + // Such a topic already exists so we just increment its count. + ++insertion.first->second->sub_count_; + } else { + insertion.first->second = std::make_unique(0, 1); + } + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", "Added subscription %s to node /%s in graph.", + node->subs.at(0).topic.c_str(), + node->name.c_str()); + return; } else if (entity == "SS") { - // Service + // TODO(yadunund): Service server } else if (entity == "SC") { - // Client + // TODO(yadunud): Service Client } else { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -312,33 +392,125 @@ void GraphCache::parse_put(const std::string & keyexpr) ///============================================================================= void GraphCache::parse_del(const std::string & keyexpr) { - // TODO(Yadunund): Validate data. - std::vector parts = split_keyexpr(keyexpr); - if (parts.size() < 3) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Received invalid liveliness token"); + auto valid_token = _parse_token(keyexpr); + if (!valid_token.has_value()) { + // Error message has already been logged. return; } - // Get the entity, ie N, MP, MS, SS, SC. - const std::string & entity = parts[2]; + const std::string & entity = valid_token->first; + auto node = std::make_shared(std::move(valid_token->second)); std::lock_guard lock(graph_mutex_); if (entity == "NN") { // Node + auto ns_it = graph_.find(node->ns); + if (ns_it != graph_.end()) { + ns_it->second.erase(node->name); + } RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", - "Removing node %s from the graph.", - parts.back().c_str() + "Removed node /%s from the graph.", + node->name.c_str() ); - graph_.remove(entity.back()); } else if (entity == "MP") { // Publisher + if (node->pubs.empty()) { + // This should never happen but we make sure _parse_token() has no error. + return; + } + auto ns_it = graph_.find(node->ns); + if (ns_it != graph_.end()) { + auto node_it = ns_it->second.find(node->name); + if (node_it != ns_it->second.end()) { + const auto found_node = node_it->second; + // Here we iterate throught the list of publishers and remove the one + // with matching name, type and qos. + // TODO(Yadunund): This can be more optimal than O(n) with some caching. + auto erase_it = found_node->pubs.begin(); + for (; erase_it != found_node->pubs.end(); ++erase_it) { + const auto & pub = *erase_it; + if (pub.topic == node->pubs.at(0).topic && + pub.type == node->pubs.at(0).type && + pub.qos == node->pubs.at(0).qos) + { + break; + } + } + if (erase_it != found_node->pubs.end()) { + found_node->pubs.erase(erase_it); + // Bookkeeping + // TODO(Yadunund): Be more systematic about generating the key. + std::string topic_key = node->pubs.at(0).topic + "?" + node->pubs.at(0).type; + auto topic_it = graph_topics_.find(topic_key); + if (topic_it != graph_topics_.end()) { + if (topic_it->second->pub_count_ == 1 && topic_it->second->sub_count_ == 0) { + // The last publisher was removed so we can delete this entry. + graph_topics_.erase(topic_key); + } else { + // Else we just decrement the count. + --topic_it->second->pub_count_; + } + } + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "Removed publisher %s from node /%s in the graph.", + node->pubs.at(0).topic.c_str(), + node->name.c_str() + ); + } + } + } } else if (entity == "MS") { // Subscription + if (node->subs.empty()) { + // This should never happen but we make sure _parse_token() has no error. + return; + } + auto ns_it = graph_.find(node->ns); + if (ns_it != graph_.end()) { + auto node_it = ns_it->second.find(node->name); + if (node_it != ns_it->second.end()) { + const auto found_node = node_it->second; + // Here we iterate throught the list of subscriptions and remove the one + // with matching name, type and qos. + // TODO(Yadunund): This can be more optimal than O(n) with some caching. + auto erase_it = found_node->subs.begin(); + for (; erase_it != found_node->subs.end(); ++erase_it) { + const auto & sub = *erase_it; + if (sub.topic == node->subs.at(0).topic && + sub.type == node->subs.at(0).type && + sub.qos == node->subs.at(0).qos) + { + break; + } + } + if (erase_it != found_node->subs.end()) { + found_node->subs.erase(erase_it); + // Bookkeeping + // TODO(Yadunund): Be more systematic about generating the key. + std::string topic_key = node->subs.at(0).topic + "?" + node->subs.at(0).type; + auto topic_it = graph_topics_.find(topic_key); + if (topic_it != graph_topics_.end()) { + if (topic_it->second->sub_count_ == 1 && topic_it->second->pub_count_ == 0) { + // The last subscription was removed so we can delete this entry. + graph_topics_.erase(topic_key); + } else { + // Else we just decrement the count. + --topic_it->second->sub_count_; + } + } + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "Removed subscription %s from node /%s in the graph.", + node->subs.at(0).topic.c_str(), + node->name.c_str() + ); + } + } + } } else if (entity == "SS") { - // Service + // TODO(yadunund): Service server } else if (entity == "SC") { - // Client + // TODO(yadunund): Service client } else { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -370,7 +542,10 @@ rmw_ret_t GraphCache::get_node_names( RCUTILS_CHECK_ALLOCATOR_WITH_MSG( allocator, "get_node_names allocator is not valid", return RMW_RET_INVALID_ARGUMENT); - size_t nodes_number = graph_.size(); + size_t nodes_number = 0; + for (auto it = graph_.begin(); it != graph_.end(); ++it) { + nodes_number += it->second.size(); + } rcutils_ret_t rcutils_ret = rcutils_string_array_init(node_names, nodes_number, allocator); @@ -423,31 +598,30 @@ rmw_ret_t GraphCache::get_node_names( std::move(free_enclaves_lambda)); } - // TODO(Yadunund): Remove this printout. - const std::string & graph_str = YAML::Dump(graph_); - RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "[graph]\n%s\n", graph_str.c_str()); // Fill node names, namespaces and enclaves. std::size_t j = 0; - for (auto it = graph_.begin(); it != graph_.end(); ++it) { - const auto & node_name = it->first.as(); - const auto & yaml_node = it->second; - node_names->data[j] = rcutils_strdup(node_name.c_str(), *allocator); - if (!node_names->data[j]) { - return RMW_RET_BAD_ALLOC; - } - node_namespaces->data[j] = rcutils_strdup( - yaml_node["namespace"].as().c_str(), *allocator); - if (!node_namespaces->data[j]) { - return RMW_RET_BAD_ALLOC; - } - if (enclaves) { - enclaves->data[j] = rcutils_strdup( - yaml_node["enclaves"].as().c_str(), *allocator); - if (!enclaves->data[j]) { + for (auto ns_it = graph_.begin(); ns_it != graph_.end(); ++ns_it) { + const std::string & ns = ns_it->first; + for (auto node_it = ns_it->second.begin(); node_it != ns_it->second.end(); ++node_it) { + const auto node = node_it->second; + node_names->data[j] = rcutils_strdup(node->name.c_str(), *allocator); + if (!node_names->data[j]) { return RMW_RET_BAD_ALLOC; } + node_namespaces->data[j] = rcutils_strdup( + ns.c_str(), *allocator); + if (!node_namespaces->data[j]) { + return RMW_RET_BAD_ALLOC; + } + if (enclaves) { + enclaves->data[j] = rcutils_strdup( + node->enclave.c_str(), *allocator); + if (!enclaves->data[j]) { + return RMW_RET_BAD_ALLOC; + } + } + ++j; } - ++j; } if (free_enclaves) { @@ -458,3 +632,97 @@ rmw_ret_t GraphCache::get_node_names( return RMW_RET_OK; } + +namespace +{ +// Shamelessly copied from https://github.com/ros2/rmw_cyclonedds/blob/f7f67bdef82f59558366aa6ce94ef9af3c5ab569/rmw_cyclonedds_cpp/src/demangle.cpp#L67 +std::string +_demangle_if_ros_type(const std::string & dds_type_string) +{ + if (dds_type_string[dds_type_string.size() - 1] != '_') { + // not a ROS type + return dds_type_string; + } + + std::string substring = "dds_::"; + size_t substring_position = dds_type_string.find(substring); + if (substring_position == std::string::npos) { + // not a ROS type + return dds_type_string; + } + + std::string type_namespace = dds_type_string.substr(0, substring_position); + type_namespace = rcpputils::find_and_replace(type_namespace, "::", "/"); + size_t start = substring_position + substring.size(); + std::string type_name = dds_type_string.substr(start, dds_type_string.length() - 1 - start); + return type_namespace + type_name; +} + +} // namespace + +///============================================================================= +rmw_ret_t GraphCache::get_topic_names_and_types( + rcutils_allocator_t * allocator, + bool no_demangle, + rmw_names_and_types_t * topic_names_and_types) +{ + static_cast(no_demangle); + RCUTILS_CHECK_ALLOCATOR_WITH_MSG( + allocator, "get_node_names allocator is not valid", return RMW_RET_INVALID_ARGUMENT); + + std::lock_guard lock(graph_mutex_); + const size_t topic_number = graph_topics_.size(); + + rmw_ret_t ret = rmw_names_and_types_init(topic_names_and_types, topic_number, allocator); + if (ret != RMW_RET_OK) { + return ret; + } + auto cleanup_names_and_types = rcpputils::make_scope_exit( + [topic_names_and_types] { + rmw_ret_t fail_ret = rmw_names_and_types_fini(topic_names_and_types); + if (fail_ret != RMW_RET_OK) { + RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup names and types during error handling"); + } + }); + + // Fill topic names and types. + std::size_t j = 0; + for (const auto & it : graph_topics_) { + // Split based on "?". + // TODO(Yadunund): Be more systematic about this. + // TODO(clalancette): Rather than doing the splitting here, should we store + // it in graph_topics_ already split? + std::vector parts = split_keyexpr(it.first, '?'); + if (parts.size() < 2) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Invalid topic_key %s", it.first.c_str()); + return RMW_RET_INVALID_ARGUMENT; + } + + topic_names_and_types->names.data[j] = rcutils_strdup(parts[0].c_str(), *allocator); + if (!topic_names_and_types->names.data[j]) { + return RMW_RET_BAD_ALLOC; + } + + // TODO(clalancette): This won't work if there are multiple types on the same topic + rcutils_ret_t rcutils_ret = rcutils_string_array_init( + &topic_names_and_types->types[j], 1, allocator); + if (RCUTILS_RET_OK != rcutils_ret) { + RMW_SET_ERROR_MSG(rcutils_get_error_string().str); + return RMW_RET_BAD_ALLOC; + } + + topic_names_and_types->types[j].data[0] = rcutils_strdup( + _demangle_if_ros_type(parts[1]).c_str(), *allocator); + if (!topic_names_and_types->types[j].data[0]) { + return RMW_RET_BAD_ALLOC; + } + + ++j; + } + + cleanup_names_and_types.cancel(); + + return RMW_RET_OK; +} diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index 0bb878b0..f2de0018 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -21,14 +21,15 @@ #include #include #include +#include #include #include "rcutils/allocator.h" #include "rcutils/types.h" #include "rmw/rmw.h" +#include "rmw/names_and_types.h" -#include "yaml-cpp/yaml.h" ///============================================================================= class GenerateToken @@ -49,6 +50,14 @@ class GenerateToken const std::string & topic, const std::string & type, const std::string & qos); + + static std::string subscription( + size_t domain_id, + const std::string & node_namespace, + const std::string & node_name, + const std::string & topic, + const std::string & type, + const std::string & qos); }; ///============================================================================= @@ -67,59 +76,29 @@ class PublishToken }; ///============================================================================= -class PublisherData final +// TODO(Yadunund): Expand to services and clients. +struct GraphNode { -public: - PublisherData( - const char * topic, const char * node, const char * namespace_, - const char * type, rcutils_allocator_t * allocator); - - ~PublisherData(); - -private: - rcutils_allocator_t * allocator_; - char * topic_name_{nullptr}; - char * node_name_{nullptr}; - char * namespace_name_{nullptr}; - char * type_name_{nullptr}; -}; - -///============================================================================= -class SubscriptionData final -{ -public: - SubscriptionData( - const char * topic, const char * node, const char * namespace_, - const char * type, rcutils_allocator_t * allocator); - - ~SubscriptionData(); - -private: - rcutils_allocator_t * allocator_; - char * topic_name_{nullptr}; - char * node_name_{nullptr}; - char * namespace_name_{nullptr}; - char * type_name_{nullptr}; + struct TopicData + { + std::string topic; + std::string type; + std::string qos; + }; + + std::string ns; + std::string name; + // TODO(Yadunund): Should enclave be the parent to the namespace key and not within a Node? + std::string enclave; + std::vector pubs; + std::vector subs; }; +using GraphNodePtr = std::shared_ptr; ///============================================================================= class GraphCache final { public: - uint64_t - add_publisher( - const char * topic, const char * node, const char * namespace_, - const char * type, rcutils_allocator_t * allocator); - - void remove_publisher(uint64_t publisher_handle); - - uint64_t - add_subscription( - const char * topic, const char * node, const char * namespace_, - const char * type, rcutils_allocator_t * allocator); - - void remove_subscription(uint64_t subscription_handle); - // Parse a PUT message over a token's key-expression and update the graph. void parse_put(const std::string & keyexpr); // Parse a DELETE message over a token's key-expression and update the graph. @@ -131,36 +110,48 @@ class GraphCache final rcutils_string_array_t * enclaves, rcutils_allocator_t * allocator) const; -private: - std::mutex publishers_mutex_; - uint64_t publishers_handle_id_{0}; - std::map> publishers_; - - std::mutex subscriptions_mutex_; - uint64_t subscriptions_handle_id_{0}; - std::map> subscriptions_; + rmw_ret_t get_topic_names_and_types( + rcutils_allocator_t * allocator, + bool no_demangle, + rmw_names_and_types_t * topic_names_and_types); +private: /* - node_1: - enclave: - namespace: - publishers: [ - { - topic: - type: - qos: - } - ] - subscriptions: [ - { - topic: - type: - qos: - } - ] - node_n: + namespace_1: + node_1: + enclave: + publishers: [ + { + topic: + type: + qos: + } + ], + subscriptions: [ + { + topic: + type: + qos: + } + ], + namespace_2: + node_n: */ - YAML::Node graph_; + // Map namespace to a map of . + std::unordered_map> graph_ = {}; + + // Optimize topic lookups mapping "topic_name?topic_type" keys to their pub/sub counts. + struct TopicStats + { + std::size_t pub_count_; + std::size_t sub_count_; + + // Constructor which initialized counters to 0. + TopicStats(std::size_t pub_count, std::size_t sub_count); + }; + using TopicStatsPtr = std::unique_ptr; + std::unordered_map graph_topics_ = {}; + mutable std::mutex graph_mutex_; }; diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 421932ac..d16e57ce 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -58,11 +58,10 @@ struct rmw_context_impl_s ///============================================================================== struct rmw_node_data_t { - // TODO(yadunund): Add a GraphCache object. - - // Map topic name to topic types. - std::unordered_set> publishers; - std::unordered_set> subscriptions; + // TODO(Yadunund): Do we need a token at the node level? Right now I have one + // for cases where a node may spin up but does not have any publishers or subscriptions. + // Liveliness token for the node. + zc_owned_liveliness_token_t token; }; ///============================================================================== @@ -71,6 +70,9 @@ struct rmw_publisher_data_t // An owned publisher. z_owned_publisher_t pub; + // Liveliness token for the publisher. + zc_owned_liveliness_token_t token; + // Type support fields const void * type_support_impl; const char * typesupport_identifier; @@ -78,8 +80,6 @@ struct rmw_publisher_data_t // Context for memory allocation for messages. rmw_context_t * context; - - uint64_t graph_cache_handle; }; ///============================================================================== @@ -113,6 +113,9 @@ struct rmw_subscription_data_t { z_owned_subscriber_t sub; + // Liveliness token for the subscription. + zc_owned_liveliness_token_t token; + const void * type_support_impl; const char * typesupport_identifier; MessageTypeSupport * type_support; @@ -126,8 +129,6 @@ struct rmw_subscription_data_t std::mutex internal_mutex; std::condition_variable * condition{nullptr}; - - uint64_t graph_cache_handle; }; #endif // DETAIL__RMW_DATA_TYPES_HPP_ diff --git a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp index 7e4f2645..a4c80b4e 100644 --- a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "detail/rmw_data_types.hpp" + #include "rcutils/strdup.h" #include "rmw/error_handling.h" @@ -31,64 +33,13 @@ rmw_get_topic_names_and_types( bool no_demangle, rmw_names_and_types_t * topic_names_and_types) { - static_cast(node); - static_cast(no_demangle); - - rmw_ret_t ret = rmw_names_and_types_init(topic_names_and_types, 1, allocator); - if (ret != RMW_RET_OK) { - return ret; - } - auto cleanup_names_and_types = rcpputils::make_scope_exit( - [topic_names_and_types] { - rmw_ret_t fail_ret = rmw_names_and_types_fini(topic_names_and_types); - if (fail_ret != RMW_RET_OK) { - RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup names and types during error handling"); - } - }); - - // topic_names_and_types->names is an rcutils_string_array_t, - // while topic_names_and_types->types is an rcutils_string_array_t * - - topic_names_and_types->names.data[0] = rcutils_strdup("/chatter", *allocator); - if (topic_names_and_types->names.data[0] == nullptr) { - RMW_SET_ERROR_MSG("failed to allocate memory for topic names"); - return RMW_RET_BAD_ALLOC; - } - auto free_names = rcpputils::make_scope_exit( - [topic_names_and_types, allocator] { - allocator->deallocate(topic_names_and_types->names.data[0], allocator->state); - }); - - rcutils_ret_t rcutils_ret = rcutils_string_array_init( - topic_names_and_types->types, 1, - allocator); - if (rcutils_ret != RCUTILS_RET_OK) { - RMW_SET_ERROR_MSG("failed to allocate memory for topic types"); - return RMW_RET_BAD_ALLOC; - } - auto fini_string_array = rcpputils::make_scope_exit( - [topic_names_and_types] { - rmw_ret_t fail_ret = rcutils_string_array_fini(topic_names_and_types->types); - if (fail_ret != RMW_RET_OK) { - RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup topic types during error handling"); - } - }); - - topic_names_and_types->types[0].data[0] = rcutils_strdup("std_msgs/msg/String", *allocator); - if (topic_names_and_types->types[0].data[0] == nullptr) { - RMW_SET_ERROR_MSG("failed to allocate memory for topic data"); - return RMW_RET_BAD_ALLOC; - } - auto free_types = rcpputils::make_scope_exit( - [topic_names_and_types, allocator] { - allocator->deallocate(topic_names_and_types->types[0].data[0], allocator->state); - }); - - free_types.cancel(); - fini_string_array.cancel(); - free_names.cancel(); - cleanup_names_and_types.cancel(); - - return RMW_RET_OK; + RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(topic_names_and_types, RMW_RET_INVALID_ARGUMENT); + + return node->context->impl->graph_cache.get_topic_names_and_types( + allocator, no_demangle, topic_names_and_types); } } // extern "C" diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 9b01b904..51f22e3c 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -248,18 +248,22 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) // Setup liveliness subscriptions for discovery. const std::string liveliness_str = GenerateToken::liveliness(context->actual_domain_id); - // Query the router to get graph information before this session was started. - // TODO(Yadunund): This will not be needed once the zenoh-c liveliness API is available. + // Query router/liveliness participants to get graph information before this session was started. RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", - "Sending Query '%s' to fetch discovery data from router...", + "Sending Query '%s' to fetch discovery data...", liveliness_str.c_str() ); z_owned_reply_channel_t channel = zc_reply_fifo_new(16); - z_get_options_t opts = z_get_options_default(); - z_get( - z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), "", z_move(channel.send), - &opts); // here, the send is moved and will be dropped by zenoh when adequate + zc_liveliness_get( + z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), + z_move(channel.send), NULL); + // Uncomment and rely on #if #endif blocks to enable this feature when building with + // zenoh-pico since liveliness is only available in zenoh-c. + // z_get_options_t opts = z_get_options_default(); + // z_get( + // z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), "", z_move(channel.send), + // &opts); // here, the send is moved and will be dropped by zenoh when adequate z_owned_reply_t reply = z_reply_null(); for (z_call(channel.recv, &reply); z_check(reply); z_call(channel.recv, &reply)) { if (z_reply_is_ok(&reply)) { @@ -284,14 +288,23 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) liveliness_str.c_str() ); - auto sub_options = z_subscriber_options_default(); - sub_options.reliability = Z_RELIABILITY_RELIABLE; + // Uncomment and rely on #if #endif blocks to enable this feature when building with + // zenoh-pico since liveliness is only available in zenoh-c. + // auto sub_options = z_subscriber_options_default(); + // sub_options.reliability = Z_RELIABILITY_RELIABLE; + // context->impl->graph_subscriber = z_declare_subscriber( + // z_loan(context->impl->session), + // z_keyexpr(liveliness_str.c_str()), + // z_move(callback), + // &sub_options); + auto sub_options = zc_liveliness_subscriber_options_null(); z_owned_closure_sample_t callback = z_closure(graph_sub_data_handler, nullptr, context->impl); - context->impl->graph_subscriber = z_declare_subscriber( + context->impl->graph_subscriber = zc_liveliness_declare_subscriber( z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), z_move(callback), &sub_options); + zc_liveliness_subscriber_options_drop(z_move(sub_options)); auto undeclare_z_sub = rcpputils::make_scope_exit( [context]() { z_undeclare_subscriber(z_move(context->impl->graph_subscriber)); diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 5df96482..8e374753 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -15,6 +15,8 @@ #include #include +#include + #include #include #include @@ -180,12 +182,34 @@ rmw_create_node( node->implementation_identifier = rmw_zenoh_identifier; node->context = context; + // Uncomment and rely on #if #endif blocks to enable this feature when building with + // zenoh-pico since liveliness is only available in zenoh-c. // Publish to the graph that a new node is in town - const bool result = PublishToken::put( - &node->context->impl->session, - GenerateToken::node(context->actual_domain_id, namespace_, name) + // const bool pub_result = PublishToken::put( + // &node->context->impl->session, + // GenerateToken::node(context->actual_domain_id, namespace_, name) + // ); + // if (!pub_result) { + // return nullptr; + // } + // Initialize liveliness token for the node to advertise that a new node is in town. + rmw_node_data_t * node_data = static_cast(node->data); + node_data->token = zc_liveliness_declare_token( + z_loan(node->context->impl->session), + z_keyexpr(GenerateToken::node(context->actual_domain_id, namespace_, name).c_str()), + NULL ); - if (!result) { + auto free_token = rcpputils::make_scope_exit( + [node]() { + if (node->data != nullptr) { + rmw_node_data_t * node_data = static_cast(node->data); + z_drop(z_move(node_data->token)); + } + }); + if (!zc_liveliness_token_check(&node_data->token)) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create liveliness token for the node."); return nullptr; } @@ -193,6 +217,7 @@ rmw_create_node( free_namespace.cancel(); free_name.cancel(); free_node.cancel(); + free_token.cancel(); return node; } @@ -204,20 +229,28 @@ rmw_destroy_node(rmw_node_t * node) RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, node->implementation_identifier, rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + // Uncomment and rely on #if #endif blocks to enable this feature when building with + // zenoh-pico since liveliness is only available in zenoh-c. // Publish to the graph that a node has ridden off into the sunset - const bool result = PublishToken::del( - &node->context->impl->session, - GenerateToken::node(node->context->actual_domain_id, node->namespace_, node->name) - ); - if (!result) { - return RMW_RET_ERROR; - } + // const bool del_result = PublishToken::del( + // &node->context->impl->session, + // GenerateToken::node(node->context->actual_domain_id, node->namespace_, node->name) + // ); + // if (!del_result) { + // return RMW_RET_ERROR; + // } + + // Undeclare liveliness token for the node to advertise that the node has ridden + // off into the sunset. + rmw_node_data_t * node_data = static_cast(node->data); + zc_liveliness_undeclare_token(z_move(node_data->token)); rcutils_allocator_t * allocator = &node->context->options.allocator; @@ -516,18 +549,49 @@ rmw_create_publisher( z_undeclare_publisher(z_move(publisher_data->pub)); }); + // Uncomment and rely on #if #endif blocks to enable this feature when building with + // zenoh-pico since liveliness is only available in zenoh-c. // Publish to the graph that a new publisher is in town // TODO(Yadunund): Publish liveliness for the new publisher. - - publisher_data->graph_cache_handle = node->context->impl->graph_cache.add_publisher( - rmw_publisher->topic_name, node->name, node->namespace_, - publisher_data->type_support->get_name(), allocator); - auto remove_from_graph_cache = rcpputils::make_scope_exit( - [node, publisher_data]() { - node->context->impl->graph_cache.remove_publisher(publisher_data->graph_cache_handle); + // const bool pub_result = PublishToken::put( + // &node->context->impl->session, + // GenerateToken::publisher( + // node->context->actual_domain_id, + // node->namespace_, + // node->name, + // rmw_publisher->topic_name, + // publisher_data->type_support->get_name(), + // "reliable") + // ); + // if (!pub_result) { + // return nullptr; + // } + publisher_data->token = zc_liveliness_declare_token( + z_loan(node->context->impl->session), + z_keyexpr( + GenerateToken::publisher( + node->context->actual_domain_id, + node->namespace_, + node->name, + rmw_publisher->topic_name, + publisher_data->type_support->get_name(), + "reliable").c_str()), + NULL + ); + auto free_token = rcpputils::make_scope_exit( + [publisher_data]() { + if (publisher_data != nullptr) { + zc_liveliness_undeclare_token(z_move(publisher_data->token)); + } }); + if (!zc_liveliness_token_check(&publisher_data->token)) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create liveliness token for the publisher."); + return nullptr; + } - remove_from_graph_cache.cancel(); + free_token.cancel(); undeclare_z_publisher.cancel(); free_topic_name.cancel(); destruct_msg_type_support.cancel(); @@ -545,6 +609,7 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) { RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(publisher->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, node->implementation_identifier, @@ -558,16 +623,29 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) rmw_ret_t ret = RMW_RET_OK; - // Publish to the graph that a publisher has ridden off into the sunset - // TODO(Yadunund): Publish liveliness for the deleted publisher. - rcutils_allocator_t * allocator = &node->context->options.allocator; - allocator->deallocate(const_cast(publisher->topic_name), allocator->state); - auto publisher_data = static_cast(publisher->data); if (publisher_data != nullptr) { - node->context->impl->graph_cache.remove_publisher(publisher_data->graph_cache_handle); + // Uncomment and rely on #if #endif blocks to enable this feature when building with + // zenoh-pico since liveliness is only available in zenoh-c. + // Publish to the graph that a publisher has ridden off into the sunset + // const bool del_result = PublishToken::del( + // &node->context->impl->session, + // GenerateToken::publisher( + // node->context->actual_domain_id, + // node->namespace_, + // node->name, + // publisher->topic_name, + // publisher_data->type_support->get_name(), + // "reliable" + // ) + // ); + // if (!del_result) { + // // TODO(Yadunund): Should this really return an error? + // return RMW_RET_ERROR; + // } + zc_liveliness_undeclare_token(z_move(publisher_data->token)); RMW_TRY_DESTRUCTOR(publisher_data->type_support->~MessageTypeSupport(), MessageTypeSupport, ); allocator->deallocate(publisher_data->type_support, allocator->state); @@ -577,6 +655,7 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) } allocator->deallocate(publisher_data, allocator->state); } + allocator->deallocate(const_cast(publisher->topic_name), allocator->state); allocator->deallocate(publisher, allocator->state); return ret; @@ -1174,18 +1253,32 @@ rmw_create_subscription( }); // Publish to the graph that a new subscription is in town - // TODO(Yadunund): Publish liveliness for the new subscription. - - - sub_data->graph_cache_handle = node->context->impl->graph_cache.add_subscription( - rmw_subscription->topic_name, node->name, node->namespace_, - sub_data->type_support->get_name(), allocator); - auto remove_from_graph_cache = rcpputils::make_scope_exit( - [node, sub_data]() { - node->context->impl->graph_cache.remove_subscription(sub_data->graph_cache_handle); + sub_data->token = zc_liveliness_declare_token( + z_loan(context_impl->session), + z_keyexpr( + GenerateToken::subscription( + node->context->actual_domain_id, + node->namespace_, + node->name, + rmw_subscription->topic_name, + sub_data->type_support->get_name(), + "reliable").c_str()), + NULL + ); + auto free_token = rcpputils::make_scope_exit( + [sub_data]() { + if (sub_data != nullptr) { + zc_liveliness_undeclare_token(z_move(sub_data->token)); + } }); + if (!zc_liveliness_token_check(&sub_data->token)) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create liveliness token for the subscription."); + return nullptr; + } - remove_from_graph_cache.cancel(); + free_token.cancel(); undeclare_z_sub.cancel(); free_topic_name.cancel(); destruct_msg_type_support.cancel(); @@ -1217,16 +1310,12 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) rmw_ret_t ret = RMW_RET_OK; - // Publish to the graph that a subscription has ridden off into the sunset - // TODO(Yadunund): Publish liveliness for the deleted subscription. - rcutils_allocator_t * allocator = &node->context->options.allocator; - allocator->deallocate(const_cast(subscription->topic_name), allocator->state); - auto sub_data = static_cast(subscription->data); if (sub_data != nullptr) { - node->context->impl->graph_cache.remove_subscription(sub_data->graph_cache_handle); + // Publish to the graph that a subscription has ridden off into the sunset + zc_liveliness_undeclare_token(z_move(sub_data->token)); RMW_TRY_DESTRUCTOR(sub_data->type_support->~MessageTypeSupport(), MessageTypeSupport, ); allocator->deallocate(sub_data->type_support, allocator->state); @@ -1239,6 +1328,7 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) RMW_TRY_DESTRUCTOR(sub_data->~rmw_subscription_data_t(), rmw_subscription_data_t, ); allocator->deallocate(sub_data, allocator->state); } + allocator->deallocate(const_cast(subscription->topic_name), allocator->state); allocator->deallocate(subscription, allocator->state); return ret;