From c84e70dfaf48afebbc5fe1eda9d1d39557f26fb0 Mon Sep 17 00:00:00 2001 From: Yadu Date: Sun, 19 Nov 2023 23:55:01 +0800 Subject: [PATCH] Revert "Finish GraphCache implementation for Pub/Sub (#66)" This reverts commit fe77eef670a502086cc4cf07d9e0217c8e588790. --- README.md | 31 +- rmw_zenoh_cpp/CMakeLists.txt | 2 + .../DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 | 3 +- rmw_zenoh_cpp/package.xml | 1 + rmw_zenoh_cpp/src/detail/graph_cache.cpp | 588 +++++------------- rmw_zenoh_cpp/src/detail/graph_cache.hpp | 139 +++-- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 19 +- .../src/rmw_get_topic_names_and_types.cpp | 69 +- rmw_zenoh_cpp/src/rmw_init.cpp | 33 +- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 172 ++--- 10 files changed, 364 insertions(+), 693 deletions(-) diff --git a/README.md b/README.md index 28814060..7203c35c 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ Build `rmw_zenoh_cpp` ```bash mkdir ~/ws_rmw_zenoh/src -p && cd ~/ws_rmw_zenoh/src -git clone https://github.com/ros2/rmw_zenoh.git +git clone git@github.com:ros2/rmw_zenoh.git cd ~/ws_rmw_zenoh source /opt/ros//setup.bash # replace with ROS 2 distro of choice colcon build --cmake-args -DCMAKE_BUILD_TYPE=Release @@ -34,42 +34,25 @@ colcon build --cmake-args -DCMAKE_BUILD_TYPE=Release ## Test -Make sure to source the built workspace using the commands below prior to running any other commands. +Source workspace ```bash cd ~/ws_rmw_zenoh source install/setup.bash ``` -### Start the zenoh router -> Note: Manually launching zenoh router won't be necessary in the future. +In a terminal launch Zenoh router: ```bash -# terminal 1 ros2 run rmw_zenoh_cpp init_rmw_zenoh_router ``` +> Note: Manually launching zenoh router won't be necessary in the future. -> Note: Without the zenoh router, nodes will not be able to discover each other since multicast discovery is disabled by default in the node's session config. Instead, nodes will receive discovery information about other peers via the zenoh router's gossip functionality. See more information on the session configs [below](#config). - -### Run the `talker` -```bash -# terminal 2 -export RMW_IMPLEMENTATION=rmw_zenoh_cpp -ros2 run demo_nodes_cpp talker -``` -> Note: Ignore all the warning printouts. +In a different terminal source install folder and execute: -### Run the `listener` ```bash -# terminal 2 export RMW_IMPLEMENTATION=rmw_zenoh_cpp -ros2 run demo_nodes_cpp listener +ros2 topic pub "/chatter" std_msgs/msg/String '{data: hello}' ``` -The listener node should start receiving messages over the `/chatter` topic. -> Note: Ignore all the warning printouts. - -### Graph introspection -Presently we only support limited `ros2cli` commands to introspect the ROS graph such as `ros2 node list` and `ros2 topic list`. - ## Config The [default configuration](rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5) sets up the zenoh sessions with the following main characteristics: @@ -86,7 +69,7 @@ A custom configuration may be provided by setting the `RMW_ZENOH_CONFIG_FILE` en ## TODO Features - [x] Publisher -- [x] Subscription +- [ ] Subscription - [ ] Client - [ ] Service - [ ] Graph introspection diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index d3f93e72..f403c364 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -21,6 +21,7 @@ find_package(rcutils REQUIRED) find_package(rosidl_typesupport_fastrtps_c REQUIRED) find_package(rosidl_typesupport_fastrtps_cpp REQUIRED) find_package(rmw REQUIRED) +find_package(yaml_cpp_vendor REQUIRED) find_package(zenoh_c_vendor REQUIRED) find_package(zenohc REQUIRED) @@ -56,6 +57,7 @@ target_link_libraries(rmw_zenoh_cpp rosidl_typesupport_fastrtps_c::rosidl_typesupport_fastrtps_c rosidl_typesupport_fastrtps_cpp::rosidl_typesupport_fastrtps_cpp rmw::rmw + yaml-cpp zenohc::lib ) diff --git a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 index 506822d9..3fa61761 100644 --- a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 +++ b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 @@ -123,7 +123,7 @@ /// This option does not make LowLatency transport mandatory, the actual implementation of transport /// used will depend on Establish procedure and other party's settings /// - /// NOTE: Currently, the LowLatency transport doesn't preserve QoS prioritization. + /// NOTE: Currently, the LowLatency transport doesn't preserve QoS prioritization. /// NOTE: Due to the note above, 'lowlatency' is incompatible with 'qos' option, so in order to /// enable 'lowlatency' you need to explicitly disable 'qos'. lowlatency: false, @@ -258,5 +258,4 @@ write: false, }, }, - } diff --git a/rmw_zenoh_cpp/package.xml b/rmw_zenoh_cpp/package.xml index 02abec46..259bbce6 100644 --- a/rmw_zenoh_cpp/package.xml +++ b/rmw_zenoh_cpp/package.xml @@ -19,6 +19,7 @@ rosidl_typesupport_fastrtps_c rosidl_typesupport_fastrtps_cpp rmw + yaml_cpp_vendor ament_lint_auto ament_lint_common diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 025b0047..a890f485 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -14,13 +14,11 @@ #include #include -#include #include #include #include #include -#include "rcpputils/find_and_replace.hpp" #include "rcpputils/scope_exit.hpp" #include "rcutils/logging_macros.h" @@ -39,19 +37,6 @@ std::string GenerateToken::liveliness(size_t domain_id) } ///============================================================================= -/** - * Generate a liveliness token for the particular entity. - * - * The liveliness tokens are in the form: - * - * @ros2_lv//// - * - * Where: - * - A number set by the user to "partition" graphs. Roughly equivalent to the domain ID in DDS. - * - The type of entity. This can be one of "NN" for a node, "MP" for a publisher, "MS" for a subscription, "SS" for a service server, or "SC" for a service client. - * - The ROS namespace for this entity. If the namespace is absolute, this function will add in an _ for later parsing reasons. - * - The ROS node name for this entity. - */ static std::string generate_base_token( const std::string & entity, size_t domain_id, @@ -59,17 +44,8 @@ static std::string generate_base_token( const std::string & name) { std::stringstream token_ss; - token_ss << "@ros2_lv/" << domain_id << "/" << entity << namespace_; - // An empty namespace from rcl will contain "/" but zenoh does not allow keys with "//". - // Hence we add an "_" to denote an empty namespace such that splitting the key - // will always result in 5 parts. - if (namespace_ == "/") { - token_ss << "_/"; - } else { - token_ss << "/"; - } - // Finally append node name. - token_ss << name; + // TODO(Yadunund): Empty namespace will contain /. Fix non-empty namespace. + token_ss << "@ros2_lv/" << domain_id << "/" << entity << namespace_ << name; return token_ss.str(); } @@ -96,20 +72,6 @@ std::string GenerateToken::publisher( return token; } -///============================================================================= -std::string GenerateToken::subscription( - size_t domain_id, - const std::string & node_namespace, - const std::string & node_name, - const std::string & topic, - const std::string & type, - const std::string & qos) -{ - std::string token = generate_base_token("MS", domain_id, node_namespace, node_name); - token += topic + "/" + type + "/" + qos; - return token; -} - ///============================================================================= bool PublishToken::put( z_owned_session_t * session, @@ -172,18 +134,123 @@ bool PublishToken::del( } ///============================================================================= -namespace +PublisherData::PublisherData( + const char * topic, const char * node, const char * namespace_, + const char * type, rcutils_allocator_t * allocator) +: allocator_(allocator) +{ + // TODO(clalancette): Check for error + topic_name_ = rcutils_strdup(topic, *allocator); + + // TODO(clalancette): Check for error + node_name_ = rcutils_strdup(node, *allocator); + + // TODO(clalancette): Check for error + namespace_name_ = rcutils_strdup(namespace_, *allocator); + + // TODO(clalancette): Check for error + type_name_ = rcutils_strdup(type, *allocator); +} + +///============================================================================= +PublisherData::~PublisherData() +{ + allocator_->deallocate(topic_name_, allocator_->state); + allocator_->deallocate(node_name_, allocator_->state); + allocator_->deallocate(namespace_name_, allocator_->state); + allocator_->deallocate(type_name_, allocator_->state); +} + +///============================================================================= +SubscriptionData::SubscriptionData( + const char * topic, const char * node, const char * namespace_, + const char * type, rcutils_allocator_t * allocator) +: allocator_(allocator) +{ + // TODO(clalancette): Check for error + topic_name_ = rcutils_strdup(topic, *allocator); + + // TODO(clalancette): Check for error + node_name_ = rcutils_strdup(node, *allocator); + + // TODO(clalancette): Check for error + namespace_name_ = rcutils_strdup(namespace_, *allocator); + + // TODO(clalancette): Check for error + type_name_ = rcutils_strdup(type, *allocator); +} + +///============================================================================= +SubscriptionData::~SubscriptionData() +{ + allocator_->deallocate(topic_name_, allocator_->state); + allocator_->deallocate(node_name_, allocator_->state); + allocator_->deallocate(namespace_name_, allocator_->state); + allocator_->deallocate(type_name_, allocator_->state); +} + +///============================================================================= +uint64_t +GraphCache::add_publisher( + const char * topic, const char * node, const char * namespace_, + const char * type, rcutils_allocator_t * allocator) +{ + std::lock_guard lck(publishers_mutex_); + uint64_t this_handle_id = publishers_handle_id_++; + publishers_.emplace( + std::make_pair( + this_handle_id, std::make_unique(topic, node, namespace_, type, allocator))); + return this_handle_id; +} + +///============================================================================= +void +GraphCache::remove_publisher(uint64_t handle) +{ + std::lock_guard lck(publishers_mutex_); + if (publishers_.count(handle) == 0) { + return; + } + + publishers_.erase(handle); +} + +///============================================================================= +uint64_t +GraphCache::add_subscription( + const char * topic, const char * node, const char * namespace_, + const char * type, rcutils_allocator_t * allocator) +{ + std::lock_guard lck(subscriptions_mutex_); + uint64_t this_handle_id = subscriptions_handle_id_++; + subscriptions_.emplace( + std::make_pair( + this_handle_id, + std::make_unique(topic, node, namespace_, type, allocator))); + return this_handle_id; +} + +///============================================================================= +void +GraphCache::remove_subscription(uint64_t handle) { -std::vector split_keyexpr( - const std::string & keyexpr, - const char delim = '/') + std::lock_guard lck(subscriptions_mutex_); + if (subscriptions_.count(handle) == 0) { + return; + } + + subscriptions_.erase(handle); +} + +///============================================================================= +static std::vector split_keyexpr(const std::string & keyexpr) { std::vector delim_idx = {}; // Insert -1 for starting position to make the split easier when using substr. delim_idx.push_back(-1); std::size_t idx = 0; for (auto it = keyexpr.begin(); it != keyexpr.end(); ++it) { - if (*it == delim) { + if (*it == '/') { delim_idx.push_back(idx); } ++idx; @@ -203,184 +270,37 @@ std::vector split_keyexpr( result.push_back(keyexpr.substr(delim_idx.back() + 1)); return result; } + ///============================================================================= -// Convert a liveliness token into a -std::optional> _parse_token(const std::string & keyexpr) +void GraphCache::parse_put(const std::string & keyexpr) { + // TODO(Yadunund): Validate data. std::vector parts = split_keyexpr(keyexpr); - // At minimum, a token will contain 5 parts (@ros2_lv, domain_id, entity, namespace, node_name). - // Basic validation. - if (parts.size() < 5) { + if (parts.size() < 3) { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Received invalid liveliness token"); - return std::nullopt; - } - for (const std::string & p : parts) { - if (p.empty()) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Received invalid liveliness token"); - return std::nullopt; - } - } - - // Get the entity, ie NN, MP, MS, SS, SC. - std::string & entity = parts[2]; - - GraphNode node; - // Nodes with empty namespaces will contain a "_". - node.ns = parts[3] == "_" ? "/" : "/" + parts[3]; - node.name = std::move(parts[4]); - - if (entity != "NN") { - if (parts.size() < 8) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Received invalid liveliness token"); - return std::nullopt; - } - GraphNode::TopicData data; - data.topic = "/" + std::move(parts[5]); - data.type = std::move(parts[6]); - data.qos = std::move(parts[7]); - - if (entity == "MP") { - node.pubs.push_back(std::move(data)); - } else if (entity == "MS") { - node.subs.push_back(std::move(data)); - } else if (entity == "SS") { - // TODO(yadunund): Service server - } else if (entity == "SC") { - // TODO(yadunund): Service client - } else { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Invalid entity [%s] in liveliness token", entity.c_str()); - return std::nullopt; - } - } - - return std::make_pair(std::move(entity), std::move(node)); -} -} // namespace - -///============================================================================= -GraphCache::TopicStats::TopicStats(std::size_t pub_count, std::size_t sub_count) -: pub_count_(pub_count), - sub_count_(sub_count) -{ - // Do nothing. -} - -///============================================================================= -void GraphCache::parse_put(const std::string & keyexpr) -{ - auto valid_token = _parse_token(keyexpr); - if (!valid_token.has_value()) { - // Error message has already been logged. return; } - const std::string & entity = valid_token->first; - auto node = std::make_shared(std::move(valid_token->second)); + // Get the entity, ie N, MP, MS, SS, SC. + const std::string & entity = parts[2]; std::lock_guard lock(graph_mutex_); - if (entity == "NN") { // Node - auto ns_it = graph_.find(node->ns); - if (ns_it == graph_.end()) { - // New namespace. - std::unordered_map map = { - {node->name, node} - }; - graph_.insert(std::make_pair(std::move(node->ns), std::move(map))); - } else { - auto insertion = ns_it->second.insert(std::make_pair(node->name, node)); - if (!insertion.second) { - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Unable to add duplicate node /%s to the graph.", - node->name.c_str()); - } - } - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Added node /%s to the graph.", - node->name.c_str()); - return; - + RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "Adding node %s to the graph.", parts.back().c_str()); + const bool has_namespace = entity.size() == 5 ? true : false; + graph_[parts.back()] = YAML::Node(); + // TODO(Yadunund): Implement enclave support. + graph_[parts.back()]["enclave"] = ""; + graph_[parts.back()]["namespace"] = has_namespace ? parts.at(4) : "/"; } else if (entity == "MP") { // Publisher - auto ns_it = graph_.find(node->ns); - if (ns_it == graph_.end()) { - // Potential edge case where a liveliness update for a node creation was missed. - // So we add the node here. - std::string ns = node->ns; - std::unordered_map map = { - {node->name, node} - }; - graph_.insert(std::make_pair(std::move(ns), std::move(map))); - } else { - auto insertion = ns_it->second.insert(std::make_pair(node->name, node)); - if (!insertion.second && !node->pubs.empty()) { - // Node already exists so just append the publisher. - insertion.first->second->pubs.push_back(node->pubs[0]); - } else { - return; - } - } - // Bookkeeping - // TODO(Yadunund): Be more systematic about generating the key. - std::string topic_key = node->pubs.at(0).topic + "?" + node->pubs.at(0).type; - auto insertion = graph_topics_.insert(std::make_pair(std::move(topic_key), nullptr)); - if (!insertion.second) { - // Such a topic already exists so we just increment its count. - ++insertion.first->second->pub_count_; - } else { - insertion.first->second = std::make_unique(1, 0); - } - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Added publisher %s to node /%s in graph.", - node->pubs.at(0).topic.c_str(), - node->name.c_str()); - return; } else if (entity == "MS") { // Subscription - auto ns_it = graph_.find(node->ns); - if (ns_it == graph_.end()) { - // Potential edge case where a liveliness update for a node creation was missed. - // So we add the node here. - std::string ns = node->ns; - std::unordered_map map = { - {node->name, node} - }; - graph_.insert(std::make_pair(std::move(ns), std::move(map))); - } else { - auto insertion = ns_it->second.insert(std::make_pair(node->name, node)); - if (!insertion.second && !node->subs.empty()) { - // Node already exists so just append the publisher. - insertion.first->second->subs.push_back(node->subs[0]); - } else { - return; - } - } - // Bookkeeping - // TODO(Yadunund): Be more systematic about generating the key. - std::string topic_key = node->subs.at(0).topic + "?" + node->subs.at(0).type; - auto insertion = graph_topics_.insert(std::make_pair(std::move(topic_key), nullptr)); - if (!insertion.second) { - // Such a topic already exists so we just increment its count. - ++insertion.first->second->sub_count_; - } else { - insertion.first->second = std::make_unique(0, 1); - } - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Added subscription %s to node /%s in graph.", - node->subs.at(0).topic.c_str(), - node->name.c_str()); - return; } else if (entity == "SS") { - // TODO(yadunund): Service server + // Service } else if (entity == "SC") { - // TODO(yadunud): Service Client + // Client } else { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -392,125 +312,33 @@ void GraphCache::parse_put(const std::string & keyexpr) ///============================================================================= void GraphCache::parse_del(const std::string & keyexpr) { - auto valid_token = _parse_token(keyexpr); - if (!valid_token.has_value()) { - // Error message has already been logged. + // TODO(Yadunund): Validate data. + std::vector parts = split_keyexpr(keyexpr); + if (parts.size() < 3) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Received invalid liveliness token"); return; } - const std::string & entity = valid_token->first; - auto node = std::make_shared(std::move(valid_token->second)); + // Get the entity, ie N, MP, MS, SS, SC. + const std::string & entity = parts[2]; std::lock_guard lock(graph_mutex_); if (entity == "NN") { // Node - auto ns_it = graph_.find(node->ns); - if (ns_it != graph_.end()) { - ns_it->second.erase(node->name); - } RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", - "Removed node /%s from the graph.", - node->name.c_str() + "Removing node %s from the graph.", + parts.back().c_str() ); + graph_.remove(entity.back()); } else if (entity == "MP") { // Publisher - if (node->pubs.empty()) { - // This should never happen but we make sure _parse_token() has no error. - return; - } - auto ns_it = graph_.find(node->ns); - if (ns_it != graph_.end()) { - auto node_it = ns_it->second.find(node->name); - if (node_it != ns_it->second.end()) { - const auto found_node = node_it->second; - // Here we iterate throught the list of publishers and remove the one - // with matching name, type and qos. - // TODO(Yadunund): This can be more optimal than O(n) with some caching. - auto erase_it = found_node->pubs.begin(); - for (; erase_it != found_node->pubs.end(); ++erase_it) { - const auto & pub = *erase_it; - if (pub.topic == node->pubs.at(0).topic && - pub.type == node->pubs.at(0).type && - pub.qos == node->pubs.at(0).qos) - { - break; - } - } - if (erase_it != found_node->pubs.end()) { - found_node->pubs.erase(erase_it); - // Bookkeeping - // TODO(Yadunund): Be more systematic about generating the key. - std::string topic_key = node->pubs.at(0).topic + "?" + node->pubs.at(0).type; - auto topic_it = graph_topics_.find(topic_key); - if (topic_it != graph_topics_.end()) { - if (topic_it->second->pub_count_ == 1 && topic_it->second->sub_count_ == 0) { - // The last publisher was removed so we can delete this entry. - graph_topics_.erase(topic_key); - } else { - // Else we just decrement the count. - --topic_it->second->pub_count_; - } - } - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "Removed publisher %s from node /%s in the graph.", - node->pubs.at(0).topic.c_str(), - node->name.c_str() - ); - } - } - } } else if (entity == "MS") { // Subscription - if (node->subs.empty()) { - // This should never happen but we make sure _parse_token() has no error. - return; - } - auto ns_it = graph_.find(node->ns); - if (ns_it != graph_.end()) { - auto node_it = ns_it->second.find(node->name); - if (node_it != ns_it->second.end()) { - const auto found_node = node_it->second; - // Here we iterate throught the list of subscriptions and remove the one - // with matching name, type and qos. - // TODO(Yadunund): This can be more optimal than O(n) with some caching. - auto erase_it = found_node->subs.begin(); - for (; erase_it != found_node->subs.end(); ++erase_it) { - const auto & sub = *erase_it; - if (sub.topic == node->subs.at(0).topic && - sub.type == node->subs.at(0).type && - sub.qos == node->subs.at(0).qos) - { - break; - } - } - if (erase_it != found_node->subs.end()) { - found_node->subs.erase(erase_it); - // Bookkeeping - // TODO(Yadunund): Be more systematic about generating the key. - std::string topic_key = node->subs.at(0).topic + "?" + node->subs.at(0).type; - auto topic_it = graph_topics_.find(topic_key); - if (topic_it != graph_topics_.end()) { - if (topic_it->second->sub_count_ == 1 && topic_it->second->pub_count_ == 0) { - // The last subscription was removed so we can delete this entry. - graph_topics_.erase(topic_key); - } else { - // Else we just decrement the count. - --topic_it->second->sub_count_; - } - } - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "Removed subscription %s from node /%s in the graph.", - node->subs.at(0).topic.c_str(), - node->name.c_str() - ); - } - } - } } else if (entity == "SS") { - // TODO(yadunund): Service server + // Service } else if (entity == "SC") { - // TODO(yadunund): Service client + // Client } else { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -542,10 +370,7 @@ rmw_ret_t GraphCache::get_node_names( RCUTILS_CHECK_ALLOCATOR_WITH_MSG( allocator, "get_node_names allocator is not valid", return RMW_RET_INVALID_ARGUMENT); - size_t nodes_number = 0; - for (auto it = graph_.begin(); it != graph_.end(); ++it) { - nodes_number += it->second.size(); - } + size_t nodes_number = graph_.size(); rcutils_ret_t rcutils_ret = rcutils_string_array_init(node_names, nodes_number, allocator); @@ -598,30 +423,31 @@ rmw_ret_t GraphCache::get_node_names( std::move(free_enclaves_lambda)); } + // TODO(Yadunund): Remove this printout. + const std::string & graph_str = YAML::Dump(graph_); + RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "[graph]\n%s\n", graph_str.c_str()); // Fill node names, namespaces and enclaves. std::size_t j = 0; - for (auto ns_it = graph_.begin(); ns_it != graph_.end(); ++ns_it) { - const std::string & ns = ns_it->first; - for (auto node_it = ns_it->second.begin(); node_it != ns_it->second.end(); ++node_it) { - const auto node = node_it->second; - node_names->data[j] = rcutils_strdup(node->name.c_str(), *allocator); - if (!node_names->data[j]) { - return RMW_RET_BAD_ALLOC; - } - node_namespaces->data[j] = rcutils_strdup( - ns.c_str(), *allocator); - if (!node_namespaces->data[j]) { + for (auto it = graph_.begin(); it != graph_.end(); ++it) { + const auto & node_name = it->first.as(); + const auto & yaml_node = it->second; + node_names->data[j] = rcutils_strdup(node_name.c_str(), *allocator); + if (!node_names->data[j]) { + return RMW_RET_BAD_ALLOC; + } + node_namespaces->data[j] = rcutils_strdup( + yaml_node["namespace"].as().c_str(), *allocator); + if (!node_namespaces->data[j]) { + return RMW_RET_BAD_ALLOC; + } + if (enclaves) { + enclaves->data[j] = rcutils_strdup( + yaml_node["enclaves"].as().c_str(), *allocator); + if (!enclaves->data[j]) { return RMW_RET_BAD_ALLOC; } - if (enclaves) { - enclaves->data[j] = rcutils_strdup( - node->enclave.c_str(), *allocator); - if (!enclaves->data[j]) { - return RMW_RET_BAD_ALLOC; - } - } - ++j; } + ++j; } if (free_enclaves) { @@ -632,97 +458,3 @@ rmw_ret_t GraphCache::get_node_names( return RMW_RET_OK; } - -namespace -{ -// Shamelessly copied from https://github.com/ros2/rmw_cyclonedds/blob/f7f67bdef82f59558366aa6ce94ef9af3c5ab569/rmw_cyclonedds_cpp/src/demangle.cpp#L67 -std::string -_demangle_if_ros_type(const std::string & dds_type_string) -{ - if (dds_type_string[dds_type_string.size() - 1] != '_') { - // not a ROS type - return dds_type_string; - } - - std::string substring = "dds_::"; - size_t substring_position = dds_type_string.find(substring); - if (substring_position == std::string::npos) { - // not a ROS type - return dds_type_string; - } - - std::string type_namespace = dds_type_string.substr(0, substring_position); - type_namespace = rcpputils::find_and_replace(type_namespace, "::", "/"); - size_t start = substring_position + substring.size(); - std::string type_name = dds_type_string.substr(start, dds_type_string.length() - 1 - start); - return type_namespace + type_name; -} - -} // namespace - -///============================================================================= -rmw_ret_t GraphCache::get_topic_names_and_types( - rcutils_allocator_t * allocator, - bool no_demangle, - rmw_names_and_types_t * topic_names_and_types) -{ - static_cast(no_demangle); - RCUTILS_CHECK_ALLOCATOR_WITH_MSG( - allocator, "get_node_names allocator is not valid", return RMW_RET_INVALID_ARGUMENT); - - std::lock_guard lock(graph_mutex_); - const size_t topic_number = graph_topics_.size(); - - rmw_ret_t ret = rmw_names_and_types_init(topic_names_and_types, topic_number, allocator); - if (ret != RMW_RET_OK) { - return ret; - } - auto cleanup_names_and_types = rcpputils::make_scope_exit( - [topic_names_and_types] { - rmw_ret_t fail_ret = rmw_names_and_types_fini(topic_names_and_types); - if (fail_ret != RMW_RET_OK) { - RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup names and types during error handling"); - } - }); - - // Fill topic names and types. - std::size_t j = 0; - for (const auto & it : graph_topics_) { - // Split based on "?". - // TODO(Yadunund): Be more systematic about this. - // TODO(clalancette): Rather than doing the splitting here, should we store - // it in graph_topics_ already split? - std::vector parts = split_keyexpr(it.first, '?'); - if (parts.size() < 2) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Invalid topic_key %s", it.first.c_str()); - return RMW_RET_INVALID_ARGUMENT; - } - - topic_names_and_types->names.data[j] = rcutils_strdup(parts[0].c_str(), *allocator); - if (!topic_names_and_types->names.data[j]) { - return RMW_RET_BAD_ALLOC; - } - - // TODO(clalancette): This won't work if there are multiple types on the same topic - rcutils_ret_t rcutils_ret = rcutils_string_array_init( - &topic_names_and_types->types[j], 1, allocator); - if (RCUTILS_RET_OK != rcutils_ret) { - RMW_SET_ERROR_MSG(rcutils_get_error_string().str); - return RMW_RET_BAD_ALLOC; - } - - topic_names_and_types->types[j].data[0] = rcutils_strdup( - _demangle_if_ros_type(parts[1]).c_str(), *allocator); - if (!topic_names_and_types->types[j].data[0]) { - return RMW_RET_BAD_ALLOC; - } - - ++j; - } - - cleanup_names_and_types.cancel(); - - return RMW_RET_OK; -} diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index f2de0018..0bb878b0 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -21,15 +21,14 @@ #include #include #include -#include #include #include "rcutils/allocator.h" #include "rcutils/types.h" #include "rmw/rmw.h" -#include "rmw/names_and_types.h" +#include "yaml-cpp/yaml.h" ///============================================================================= class GenerateToken @@ -50,14 +49,6 @@ class GenerateToken const std::string & topic, const std::string & type, const std::string & qos); - - static std::string subscription( - size_t domain_id, - const std::string & node_namespace, - const std::string & node_name, - const std::string & topic, - const std::string & type, - const std::string & qos); }; ///============================================================================= @@ -76,29 +67,59 @@ class PublishToken }; ///============================================================================= -// TODO(Yadunund): Expand to services and clients. -struct GraphNode +class PublisherData final { - struct TopicData - { - std::string topic; - std::string type; - std::string qos; - }; - - std::string ns; - std::string name; - // TODO(Yadunund): Should enclave be the parent to the namespace key and not within a Node? - std::string enclave; - std::vector pubs; - std::vector subs; +public: + PublisherData( + const char * topic, const char * node, const char * namespace_, + const char * type, rcutils_allocator_t * allocator); + + ~PublisherData(); + +private: + rcutils_allocator_t * allocator_; + char * topic_name_{nullptr}; + char * node_name_{nullptr}; + char * namespace_name_{nullptr}; + char * type_name_{nullptr}; +}; + +///============================================================================= +class SubscriptionData final +{ +public: + SubscriptionData( + const char * topic, const char * node, const char * namespace_, + const char * type, rcutils_allocator_t * allocator); + + ~SubscriptionData(); + +private: + rcutils_allocator_t * allocator_; + char * topic_name_{nullptr}; + char * node_name_{nullptr}; + char * namespace_name_{nullptr}; + char * type_name_{nullptr}; }; -using GraphNodePtr = std::shared_ptr; ///============================================================================= class GraphCache final { public: + uint64_t + add_publisher( + const char * topic, const char * node, const char * namespace_, + const char * type, rcutils_allocator_t * allocator); + + void remove_publisher(uint64_t publisher_handle); + + uint64_t + add_subscription( + const char * topic, const char * node, const char * namespace_, + const char * type, rcutils_allocator_t * allocator); + + void remove_subscription(uint64_t subscription_handle); + // Parse a PUT message over a token's key-expression and update the graph. void parse_put(const std::string & keyexpr); // Parse a DELETE message over a token's key-expression and update the graph. @@ -110,48 +131,36 @@ class GraphCache final rcutils_string_array_t * enclaves, rcutils_allocator_t * allocator) const; - rmw_ret_t get_topic_names_and_types( - rcutils_allocator_t * allocator, - bool no_demangle, - rmw_names_and_types_t * topic_names_and_types); - private: - /* - namespace_1: - node_1: - enclave: - publishers: [ - { - topic: - type: - qos: - } - ], - subscriptions: [ - { - topic: - type: - qos: - } - ], - namespace_2: - node_n: - */ - // Map namespace to a map of . - std::unordered_map> graph_ = {}; + std::mutex publishers_mutex_; + uint64_t publishers_handle_id_{0}; + std::map> publishers_; - // Optimize topic lookups mapping "topic_name?topic_type" keys to their pub/sub counts. - struct TopicStats - { - std::size_t pub_count_; - std::size_t sub_count_; - - // Constructor which initialized counters to 0. - TopicStats(std::size_t pub_count, std::size_t sub_count); - }; - using TopicStatsPtr = std::unique_ptr; - std::unordered_map graph_topics_ = {}; + std::mutex subscriptions_mutex_; + uint64_t subscriptions_handle_id_{0}; + std::map> subscriptions_; + /* + node_1: + enclave: + namespace: + publishers: [ + { + topic: + type: + qos: + } + ] + subscriptions: [ + { + topic: + type: + qos: + } + ] + node_n: + */ + YAML::Node graph_; mutable std::mutex graph_mutex_; }; diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index d16e57ce..421932ac 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -58,10 +58,11 @@ struct rmw_context_impl_s ///============================================================================== struct rmw_node_data_t { - // TODO(Yadunund): Do we need a token at the node level? Right now I have one - // for cases where a node may spin up but does not have any publishers or subscriptions. - // Liveliness token for the node. - zc_owned_liveliness_token_t token; + // TODO(yadunund): Add a GraphCache object. + + // Map topic name to topic types. + std::unordered_set> publishers; + std::unordered_set> subscriptions; }; ///============================================================================== @@ -70,9 +71,6 @@ struct rmw_publisher_data_t // An owned publisher. z_owned_publisher_t pub; - // Liveliness token for the publisher. - zc_owned_liveliness_token_t token; - // Type support fields const void * type_support_impl; const char * typesupport_identifier; @@ -80,6 +78,8 @@ struct rmw_publisher_data_t // Context for memory allocation for messages. rmw_context_t * context; + + uint64_t graph_cache_handle; }; ///============================================================================== @@ -113,9 +113,6 @@ struct rmw_subscription_data_t { z_owned_subscriber_t sub; - // Liveliness token for the subscription. - zc_owned_liveliness_token_t token; - const void * type_support_impl; const char * typesupport_identifier; MessageTypeSupport * type_support; @@ -129,6 +126,8 @@ struct rmw_subscription_data_t std::mutex internal_mutex; std::condition_variable * condition{nullptr}; + + uint64_t graph_cache_handle; }; #endif // DETAIL__RMW_DATA_TYPES_HPP_ diff --git a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp index a4c80b4e..7e4f2645 100644 --- a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "detail/rmw_data_types.hpp" - #include "rcutils/strdup.h" #include "rmw/error_handling.h" @@ -33,13 +31,64 @@ rmw_get_topic_names_and_types( bool no_demangle, rmw_names_and_types_t * topic_names_and_types) { - RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(topic_names_and_types, RMW_RET_INVALID_ARGUMENT); - - return node->context->impl->graph_cache.get_topic_names_and_types( - allocator, no_demangle, topic_names_and_types); + static_cast(node); + static_cast(no_demangle); + + rmw_ret_t ret = rmw_names_and_types_init(topic_names_and_types, 1, allocator); + if (ret != RMW_RET_OK) { + return ret; + } + auto cleanup_names_and_types = rcpputils::make_scope_exit( + [topic_names_and_types] { + rmw_ret_t fail_ret = rmw_names_and_types_fini(topic_names_and_types); + if (fail_ret != RMW_RET_OK) { + RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup names and types during error handling"); + } + }); + + // topic_names_and_types->names is an rcutils_string_array_t, + // while topic_names_and_types->types is an rcutils_string_array_t * + + topic_names_and_types->names.data[0] = rcutils_strdup("/chatter", *allocator); + if (topic_names_and_types->names.data[0] == nullptr) { + RMW_SET_ERROR_MSG("failed to allocate memory for topic names"); + return RMW_RET_BAD_ALLOC; + } + auto free_names = rcpputils::make_scope_exit( + [topic_names_and_types, allocator] { + allocator->deallocate(topic_names_and_types->names.data[0], allocator->state); + }); + + rcutils_ret_t rcutils_ret = rcutils_string_array_init( + topic_names_and_types->types, 1, + allocator); + if (rcutils_ret != RCUTILS_RET_OK) { + RMW_SET_ERROR_MSG("failed to allocate memory for topic types"); + return RMW_RET_BAD_ALLOC; + } + auto fini_string_array = rcpputils::make_scope_exit( + [topic_names_and_types] { + rmw_ret_t fail_ret = rcutils_string_array_fini(topic_names_and_types->types); + if (fail_ret != RMW_RET_OK) { + RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup topic types during error handling"); + } + }); + + topic_names_and_types->types[0].data[0] = rcutils_strdup("std_msgs/msg/String", *allocator); + if (topic_names_and_types->types[0].data[0] == nullptr) { + RMW_SET_ERROR_MSG("failed to allocate memory for topic data"); + return RMW_RET_BAD_ALLOC; + } + auto free_types = rcpputils::make_scope_exit( + [topic_names_and_types, allocator] { + allocator->deallocate(topic_names_and_types->types[0].data[0], allocator->state); + }); + + free_types.cancel(); + fini_string_array.cancel(); + free_names.cancel(); + cleanup_names_and_types.cancel(); + + return RMW_RET_OK; } } // extern "C" diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 51f22e3c..9b01b904 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -248,22 +248,18 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) // Setup liveliness subscriptions for discovery. const std::string liveliness_str = GenerateToken::liveliness(context->actual_domain_id); - // Query router/liveliness participants to get graph information before this session was started. + // Query the router to get graph information before this session was started. + // TODO(Yadunund): This will not be needed once the zenoh-c liveliness API is available. RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", - "Sending Query '%s' to fetch discovery data...", + "Sending Query '%s' to fetch discovery data from router...", liveliness_str.c_str() ); z_owned_reply_channel_t channel = zc_reply_fifo_new(16); - zc_liveliness_get( - z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), - z_move(channel.send), NULL); - // Uncomment and rely on #if #endif blocks to enable this feature when building with - // zenoh-pico since liveliness is only available in zenoh-c. - // z_get_options_t opts = z_get_options_default(); - // z_get( - // z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), "", z_move(channel.send), - // &opts); // here, the send is moved and will be dropped by zenoh when adequate + z_get_options_t opts = z_get_options_default(); + z_get( + z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), "", z_move(channel.send), + &opts); // here, the send is moved and will be dropped by zenoh when adequate z_owned_reply_t reply = z_reply_null(); for (z_call(channel.recv, &reply); z_check(reply); z_call(channel.recv, &reply)) { if (z_reply_is_ok(&reply)) { @@ -288,23 +284,14 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) liveliness_str.c_str() ); - // Uncomment and rely on #if #endif blocks to enable this feature when building with - // zenoh-pico since liveliness is only available in zenoh-c. - // auto sub_options = z_subscriber_options_default(); - // sub_options.reliability = Z_RELIABILITY_RELIABLE; - // context->impl->graph_subscriber = z_declare_subscriber( - // z_loan(context->impl->session), - // z_keyexpr(liveliness_str.c_str()), - // z_move(callback), - // &sub_options); - auto sub_options = zc_liveliness_subscriber_options_null(); + auto sub_options = z_subscriber_options_default(); + sub_options.reliability = Z_RELIABILITY_RELIABLE; z_owned_closure_sample_t callback = z_closure(graph_sub_data_handler, nullptr, context->impl); - context->impl->graph_subscriber = zc_liveliness_declare_subscriber( + context->impl->graph_subscriber = z_declare_subscriber( z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), z_move(callback), &sub_options); - zc_liveliness_subscriber_options_drop(z_move(sub_options)); auto undeclare_z_sub = rcpputils::make_scope_exit( [context]() { z_undeclare_subscriber(z_move(context->impl->graph_subscriber)); diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 8e374753..5df96482 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -15,8 +15,6 @@ #include #include -#include - #include #include #include @@ -182,34 +180,12 @@ rmw_create_node( node->implementation_identifier = rmw_zenoh_identifier; node->context = context; - // Uncomment and rely on #if #endif blocks to enable this feature when building with - // zenoh-pico since liveliness is only available in zenoh-c. // Publish to the graph that a new node is in town - // const bool pub_result = PublishToken::put( - // &node->context->impl->session, - // GenerateToken::node(context->actual_domain_id, namespace_, name) - // ); - // if (!pub_result) { - // return nullptr; - // } - // Initialize liveliness token for the node to advertise that a new node is in town. - rmw_node_data_t * node_data = static_cast(node->data); - node_data->token = zc_liveliness_declare_token( - z_loan(node->context->impl->session), - z_keyexpr(GenerateToken::node(context->actual_domain_id, namespace_, name).c_str()), - NULL + const bool result = PublishToken::put( + &node->context->impl->session, + GenerateToken::node(context->actual_domain_id, namespace_, name) ); - auto free_token = rcpputils::make_scope_exit( - [node]() { - if (node->data != nullptr) { - rmw_node_data_t * node_data = static_cast(node->data); - z_drop(z_move(node_data->token)); - } - }); - if (!zc_liveliness_token_check(&node_data->token)) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to create liveliness token for the node."); + if (!result) { return nullptr; } @@ -217,7 +193,6 @@ rmw_create_node( free_namespace.cancel(); free_name.cancel(); free_node.cancel(); - free_token.cancel(); return node; } @@ -229,28 +204,20 @@ rmw_destroy_node(rmw_node_t * node) RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(node->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, node->implementation_identifier, rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - // Uncomment and rely on #if #endif blocks to enable this feature when building with - // zenoh-pico since liveliness is only available in zenoh-c. // Publish to the graph that a node has ridden off into the sunset - // const bool del_result = PublishToken::del( - // &node->context->impl->session, - // GenerateToken::node(node->context->actual_domain_id, node->namespace_, node->name) - // ); - // if (!del_result) { - // return RMW_RET_ERROR; - // } - - // Undeclare liveliness token for the node to advertise that the node has ridden - // off into the sunset. - rmw_node_data_t * node_data = static_cast(node->data); - zc_liveliness_undeclare_token(z_move(node_data->token)); + const bool result = PublishToken::del( + &node->context->impl->session, + GenerateToken::node(node->context->actual_domain_id, node->namespace_, node->name) + ); + if (!result) { + return RMW_RET_ERROR; + } rcutils_allocator_t * allocator = &node->context->options.allocator; @@ -549,49 +516,18 @@ rmw_create_publisher( z_undeclare_publisher(z_move(publisher_data->pub)); }); - // Uncomment and rely on #if #endif blocks to enable this feature when building with - // zenoh-pico since liveliness is only available in zenoh-c. // Publish to the graph that a new publisher is in town // TODO(Yadunund): Publish liveliness for the new publisher. - // const bool pub_result = PublishToken::put( - // &node->context->impl->session, - // GenerateToken::publisher( - // node->context->actual_domain_id, - // node->namespace_, - // node->name, - // rmw_publisher->topic_name, - // publisher_data->type_support->get_name(), - // "reliable") - // ); - // if (!pub_result) { - // return nullptr; - // } - publisher_data->token = zc_liveliness_declare_token( - z_loan(node->context->impl->session), - z_keyexpr( - GenerateToken::publisher( - node->context->actual_domain_id, - node->namespace_, - node->name, - rmw_publisher->topic_name, - publisher_data->type_support->get_name(), - "reliable").c_str()), - NULL - ); - auto free_token = rcpputils::make_scope_exit( - [publisher_data]() { - if (publisher_data != nullptr) { - zc_liveliness_undeclare_token(z_move(publisher_data->token)); - } + + publisher_data->graph_cache_handle = node->context->impl->graph_cache.add_publisher( + rmw_publisher->topic_name, node->name, node->namespace_, + publisher_data->type_support->get_name(), allocator); + auto remove_from_graph_cache = rcpputils::make_scope_exit( + [node, publisher_data]() { + node->context->impl->graph_cache.remove_publisher(publisher_data->graph_cache_handle); }); - if (!zc_liveliness_token_check(&publisher_data->token)) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to create liveliness token for the publisher."); - return nullptr; - } - free_token.cancel(); + remove_from_graph_cache.cancel(); undeclare_z_publisher.cancel(); free_topic_name.cancel(); destruct_msg_type_support.cancel(); @@ -609,7 +545,6 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) { RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(publisher->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, node->implementation_identifier, @@ -623,29 +558,16 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) rmw_ret_t ret = RMW_RET_OK; + // Publish to the graph that a publisher has ridden off into the sunset + // TODO(Yadunund): Publish liveliness for the deleted publisher. + rcutils_allocator_t * allocator = &node->context->options.allocator; + allocator->deallocate(const_cast(publisher->topic_name), allocator->state); + auto publisher_data = static_cast(publisher->data); if (publisher_data != nullptr) { - // Uncomment and rely on #if #endif blocks to enable this feature when building with - // zenoh-pico since liveliness is only available in zenoh-c. - // Publish to the graph that a publisher has ridden off into the sunset - // const bool del_result = PublishToken::del( - // &node->context->impl->session, - // GenerateToken::publisher( - // node->context->actual_domain_id, - // node->namespace_, - // node->name, - // publisher->topic_name, - // publisher_data->type_support->get_name(), - // "reliable" - // ) - // ); - // if (!del_result) { - // // TODO(Yadunund): Should this really return an error? - // return RMW_RET_ERROR; - // } - zc_liveliness_undeclare_token(z_move(publisher_data->token)); + node->context->impl->graph_cache.remove_publisher(publisher_data->graph_cache_handle); RMW_TRY_DESTRUCTOR(publisher_data->type_support->~MessageTypeSupport(), MessageTypeSupport, ); allocator->deallocate(publisher_data->type_support, allocator->state); @@ -655,7 +577,6 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) } allocator->deallocate(publisher_data, allocator->state); } - allocator->deallocate(const_cast(publisher->topic_name), allocator->state); allocator->deallocate(publisher, allocator->state); return ret; @@ -1253,32 +1174,18 @@ rmw_create_subscription( }); // Publish to the graph that a new subscription is in town - sub_data->token = zc_liveliness_declare_token( - z_loan(context_impl->session), - z_keyexpr( - GenerateToken::subscription( - node->context->actual_domain_id, - node->namespace_, - node->name, - rmw_subscription->topic_name, - sub_data->type_support->get_name(), - "reliable").c_str()), - NULL - ); - auto free_token = rcpputils::make_scope_exit( - [sub_data]() { - if (sub_data != nullptr) { - zc_liveliness_undeclare_token(z_move(sub_data->token)); - } + // TODO(Yadunund): Publish liveliness for the new subscription. + + + sub_data->graph_cache_handle = node->context->impl->graph_cache.add_subscription( + rmw_subscription->topic_name, node->name, node->namespace_, + sub_data->type_support->get_name(), allocator); + auto remove_from_graph_cache = rcpputils::make_scope_exit( + [node, sub_data]() { + node->context->impl->graph_cache.remove_subscription(sub_data->graph_cache_handle); }); - if (!zc_liveliness_token_check(&sub_data->token)) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to create liveliness token for the subscription."); - return nullptr; - } - free_token.cancel(); + remove_from_graph_cache.cancel(); undeclare_z_sub.cancel(); free_topic_name.cancel(); destruct_msg_type_support.cancel(); @@ -1310,12 +1217,16 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) rmw_ret_t ret = RMW_RET_OK; + // Publish to the graph that a subscription has ridden off into the sunset + // TODO(Yadunund): Publish liveliness for the deleted subscription. + rcutils_allocator_t * allocator = &node->context->options.allocator; + allocator->deallocate(const_cast(subscription->topic_name), allocator->state); + auto sub_data = static_cast(subscription->data); if (sub_data != nullptr) { - // Publish to the graph that a subscription has ridden off into the sunset - zc_liveliness_undeclare_token(z_move(sub_data->token)); + node->context->impl->graph_cache.remove_subscription(sub_data->graph_cache_handle); RMW_TRY_DESTRUCTOR(sub_data->type_support->~MessageTypeSupport(), MessageTypeSupport, ); allocator->deallocate(sub_data->type_support, allocator->state); @@ -1328,7 +1239,6 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) RMW_TRY_DESTRUCTOR(sub_data->~rmw_subscription_data_t(), rmw_subscription_data_t, ); allocator->deallocate(sub_data, allocator->state); } - allocator->deallocate(const_cast(subscription->topic_name), allocator->state); allocator->deallocate(subscription, allocator->state); return ret;