From 60c4380c9c6e43bbdde45f5092fcfeccdfc4fc6d Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Wed, 7 Feb 2024 22:24:39 -0500 Subject: [PATCH] Fix a number of small issues in rmw_zenoh_cpp (#106) * Keep a running total of the total number of nodes in the graph. That way we don't have to compute it when the RMW layer asks for it. Signed-off-by: Chris Lalancette * Make sure to remove an empty namespace from the graph cache. If all nodes from a particular namespace have been removed, make sure to remove that namespace as well. Signed-off-by: Chris Lalancette * Remove unnecessary TODO comments. Signed-off-by: Chris Lalancette * Remove PublishToken. While it is a nice idea, there is more than just liveliness that will prevent us from using zenoh-pico currently. Remove this code until we decide to officially support zenoh-pico. Signed-off-by: Chris Lalancette * Small cleanup for token error handling. We know that the pointer won't be NULL by that point, so we don't need the check. Signed-off-by: Chris Lalancette * Remove hand-coded rcutils_strdup implementation. There is really no need for it; we can just use rcutils_strdup instead. Signed-off-by: Chris Lalancette * Speed up ros_topic_name_to_zenoh_key. Use stringstream here is really slow; we can speed it up by about 100% by switching to std::to_string instead. Signed-off-by: Chris Lalancette * Switch to RAII for saved_msg_data. Signed-off-by: Chris Lalancette * Implement rmw_compare_gids_equal. Signed-off-by: Chris Lalancette --------- Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 17 +-- rmw_zenoh_cpp/src/detail/graph_cache.hpp | 1 + rmw_zenoh_cpp/src/detail/liveliness_utils.cpp | 61 -------- rmw_zenoh_cpp/src/detail/liveliness_utils.hpp | 15 -- rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 6 +- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 2 + rmw_zenoh_cpp/src/rmw_zenoh.cpp | 133 +++++------------- 7 files changed, 49 insertions(+), 186 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index c442ddfd..7d322c20 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -205,6 +205,7 @@ void GraphCache::parse_put(const std::string & keyexpr) NodeMap node_map = { {entity.node_name(), make_graph_node(entity, *this)}}; graph_.emplace(std::make_pair(entity.node_namespace(), std::move(node_map))); + total_nodes_in_graph_ += 1; return; } @@ -226,6 +227,7 @@ void GraphCache::parse_put(const std::string & keyexpr) // name but unique id. NodeMap::iterator insertion_it = ns_it->second.insert(std::make_pair(entity.node_name(), make_graph_node(entity, *this))); + total_nodes_in_graph_ += 1; if (insertion_it == ns_it->second.end()) { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -456,6 +458,10 @@ void GraphCache::parse_del(const std::string & keyexpr) remove_topics(graph_node->clients_, EntityType::Client, *this); } ns_it->second.erase(node_it); + total_nodes_in_graph_ -= 1; + if (ns_it->second.size() == 0) { + graph_.erase(entity.node_namespace()); + } return; } @@ -494,13 +500,8 @@ rmw_ret_t GraphCache::get_node_names( RCUTILS_CHECK_ALLOCATOR_WITH_MSG( allocator, "get_node_names allocator is not valid", return RMW_RET_INVALID_ARGUMENT); - size_t nodes_number = 0; - for (const std::pair & it : graph_) { - nodes_number += it.second.size(); - } - rcutils_ret_t rcutils_ret = - rcutils_string_array_init(node_names, nodes_number, allocator); + rcutils_string_array_init(node_names, total_nodes_in_graph_, allocator); if (rcutils_ret != RCUTILS_RET_OK) { return RMW_RET_BAD_ALLOC; } @@ -515,7 +516,7 @@ rmw_ret_t GraphCache::get_node_names( }); rcutils_ret = - rcutils_string_array_init(node_namespaces, nodes_number, allocator); + rcutils_string_array_init(node_namespaces, total_nodes_in_graph_, allocator); if (rcutils_ret != RCUTILS_RET_OK) { return RMW_RET_BAD_ALLOC; } @@ -541,7 +542,7 @@ rmw_ret_t GraphCache::get_node_names( std::shared_ptr> free_enclaves{nullptr}; if (enclaves) { rcutils_ret = - rcutils_string_array_init(enclaves, nodes_number, allocator); + rcutils_string_array_init(enclaves, total_nodes_in_graph_, allocator); if (RCUTILS_RET_OK != rcutils_ret) { return RMW_RET_BAD_ALLOC; } diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index 6add0f7c..8b66ac66 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -180,6 +180,7 @@ class GraphCache final using NamespaceMap = std::unordered_map; // Map namespace to a map of . NamespaceMap graph_ = {}; + size_t total_nodes_in_graph_{0}; // Optimize pub/sub lookups across the graph. GraphNode::TopicMap graph_topics_ = {}; diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index 430468f1..997f76b3 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -432,67 +432,6 @@ std::string Entity::keyexpr() const return this->keyexpr_; } -///============================================================================= -bool PublishToken::put( - z_owned_session_t * session, - const std::string & token) -{ - if (!z_session_check(session)) { - RCUTILS_SET_ERROR_MSG("The zenoh session is invalid."); - return false; - } - - // TODO(Yadunund): z_keyexpr_new creates a copy so find a way to avoid it. - z_owned_keyexpr_t keyexpr = z_keyexpr_new(token.c_str()); - auto drop_keyexpr = rcpputils::make_scope_exit( - [&keyexpr]() { - z_drop(z_move(keyexpr)); - }); - if (!z_keyexpr_check(&keyexpr)) { - RCUTILS_SET_ERROR_MSG("invalid keyexpression generation for liveliness publication."); - return false; - } - - z_put_options_t options = z_put_options_default(); - options.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL); - if (z_put(z_loan(*session), z_keyexpr(token.c_str()), nullptr, 0, &options) < 0) { - RCUTILS_SET_ERROR_MSG("unable to publish liveliness for node creation"); - return false; - } - - return true; -} - -///============================================================================= -bool PublishToken::del( - z_owned_session_t * session, - const std::string & token) -{ - if (!z_session_check(session)) { - RCUTILS_SET_ERROR_MSG("The zenoh session is invalid."); - return false; - } - - // TODO(Yadunund): z_keyexpr_new creates a copy so find a way to avoid it. - z_owned_keyexpr_t keyexpr = z_keyexpr_new(token.c_str()); - auto drop_keyexpr = rcpputils::make_scope_exit( - [&keyexpr]() { - z_drop(z_move(keyexpr)); - }); - if (!z_keyexpr_check(&keyexpr)) { - RCUTILS_SET_ERROR_MSG("invalid key-expression generation for liveliness publication."); - return false; - } - - const z_delete_options_t options = z_delete_options_default(); - if (z_delete(z_loan(*session), z_loan(keyexpr), &options) < 0) { - RCUTILS_SET_ERROR_MSG("failed to delete liveliness key"); - return false; - } - - return true; -} - ///============================================================================= std::string mangle_name(const std::string & input) { diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp index 031edc3a..390ecc68 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp @@ -119,21 +119,6 @@ class Entity std::string keyexpr_; }; -///============================================================================= -/// Helper utilities to put/delete tokens until liveliness is supported in the -/// zenoh-c bindings. -class PublishToken -{ -public: - static bool put( - z_owned_session_t * session, - const std::string & token); - - static bool del( - z_owned_session_t * session, - const std::string & token); -}; - /// Replace "/" instances with "%". std::string mangle_name(const std::string & input); diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 99eb3fd9..5fc8e65d 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -31,6 +31,11 @@ saved_msg_data::saved_msg_data(zc_owned_payload_t p, uint64_t recv_ts, const uin memcpy(publisher_gid, pub_gid, 16); } +saved_msg_data::~saved_msg_data() +{ + z_drop(z_move(payload)); +} + void rmw_subscription_data_t::attach_condition(std::condition_variable * condition_variable) { std::lock_guard lock(condition_mutex_); @@ -92,7 +97,6 @@ void rmw_subscription_data_t::add_new_message( // queue if it is non-empty. if (!message_queue_.empty()) { std::unique_ptr old = std::move(message_queue_.front()); - z_drop(z_move(old->payload)); message_queue_.pop_front(); } } diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 0100211f..e9bcfac9 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -111,6 +111,8 @@ struct saved_msg_data { explicit saved_msg_data(zc_owned_payload_t p, uint64_t recv_ts, const uint8_t pub_gid[16]); + ~saved_msg_data(); + zc_owned_payload_t payload; uint64_t recv_timestamp; uint8_t publisher_gid[16]; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 5f9af074..2ffd5600 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -25,7 +25,6 @@ #include #include #include -#include #include #include @@ -82,6 +81,8 @@ namespace z_owned_keyexpr_t ros_topic_name_to_zenoh_key( const char * const topic_name, size_t domain_id, rcutils_allocator_t * allocator) { + std::string d = std::to_string(domain_id); + size_t start_offset = 0; size_t topic_name_len = strlen(topic_name); size_t end_offset = topic_name_len; @@ -97,18 +98,17 @@ z_owned_keyexpr_t ros_topic_name_to_zenoh_key( } } - std::stringstream domain_ss; - domain_ss << domain_id; char * stripped_topic_name = rcutils_strndup( &topic_name[start_offset], end_offset - start_offset, *allocator); if (stripped_topic_name == nullptr) { return z_keyexpr_null(); } - z_owned_keyexpr_t keyexpr = z_keyexpr_join( - z_keyexpr(domain_ss.str().c_str()), z_keyexpr(stripped_topic_name)); + + z_owned_keyexpr_t ret = z_keyexpr_join(z_keyexpr(d.c_str()), z_keyexpr(stripped_topic_name)); + allocator->deallocate(stripped_topic_name, allocator->state); - return keyexpr; + return ret; } //============================================================================== @@ -259,17 +259,11 @@ rmw_create_node( allocator->deallocate(node, allocator->state); }); - size_t name_len = strlen(name); - // We specifically don't use rcutils_strdup() here because we want to avoid iterating over the - // name again looking for the \0 (we already did that above). - char * new_string = static_cast(allocator->allocate(name_len + 1, allocator->state)); + node->name = rcutils_strdup(name, *allocator); RMW_CHECK_FOR_NULL_WITH_MSG( - new_string, + node->name, "unable to allocate memory for node name", return nullptr); - memcpy(new_string, name, name_len); - new_string[name_len] = '\0'; - node->name = new_string; auto free_name = rcpputils::make_scope_exit( [node, allocator]() { allocator->deallocate(const_cast(node->name), allocator->state); @@ -285,8 +279,6 @@ rmw_create_node( allocator->deallocate(const_cast(node->namespace_), allocator->state); }); - // TODO(yadunund): Register with storage system here and throw error if - // zenohd is not running. // Put metadata into node->data. node->data = allocator->zero_allocate(1, sizeof(rmw_node_data_t), allocator->state); RMW_CHECK_FOR_NULL_WITH_MSG( @@ -301,16 +293,6 @@ rmw_create_node( node->implementation_identifier = rmw_zenoh_identifier; node->context = context; - // Uncomment and rely on #if #endif blocks to enable this feature when building with - // zenoh-pico since liveliness is only available in zenoh-c. - // Publish to the graph that a new node is in town - // const bool pub_result = PublishToken::put( - // &node->context->impl->session, - // liveliness::GenerateToken::node(context->actual_domain_id, namespace_, name) - // ); - // if (!pub_result) { - // return nullptr; - // } // Initialize liveliness token for the node to advertise that a new node is in town. rmw_node_data_t * node_data = static_cast(node->data); const auto liveliness_entity = liveliness::Entity::make( @@ -329,11 +311,8 @@ rmw_create_node( NULL ); auto free_token = rcpputils::make_scope_exit( - [node]() { - if (node->data != nullptr) { - rmw_node_data_t * node_data = static_cast(node->data); - z_drop(z_move(node_data->token)); - } + [node_data]() { + z_drop(z_move(node_data->token)); }); if (!z_check(node_data->token)) { RCUTILS_LOG_ERROR_NAMED( @@ -342,11 +321,11 @@ rmw_create_node( return nullptr; } + free_token.cancel(); free_node_data.cancel(); free_namespace.cancel(); free_name.cancel(); free_node.cancel(); - free_token.cancel(); return node; } @@ -365,18 +344,6 @@ rmw_destroy_node(rmw_node_t * node) rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - // Uncomment and rely on #if #endif blocks to enable this feature when building with - // zenoh-pico since liveliness is only available in zenoh-c. - // Publish to the graph that a node has ridden off into the sunset - // const bool del_result = PublishToken::del( - // &node->context->impl->session, - // liveliness::GenerateToken::node(node->context->actual_domain_id, node->namespace_, - // node->name) - // ); - // if (!del_result) { - // return RMW_RET_ERROR; - // } - // Undeclare liveliness token for the node to advertise that the node has ridden // off into the sunset. rmw_node_data_t * node_data = static_cast(node->data); @@ -576,20 +543,13 @@ rmw_create_publisher( rmw_publisher->data = publisher_data; rmw_publisher->implementation_identifier = rmw_zenoh_identifier; rmw_publisher->options = *publisher_options; - // TODO(yadunund): Update this. rmw_publisher->can_loan_messages = false; - size_t topic_len = strlen(topic_name); - // We specifically don't use rcutils_strdup() here because we want to avoid iterating over the - // name again looking for the \0 (we already did that above). - char * new_string = static_cast(allocator->allocate(topic_len + 1, allocator->state)); + rmw_publisher->topic_name = rcutils_strdup(topic_name, *allocator); RMW_CHECK_FOR_NULL_WITH_MSG( - new_string, + rmw_publisher->topic_name, "Failed to allocate topic name", return nullptr); - memcpy(new_string, topic_name, topic_len); - new_string[topic_len] = '\0'; - rmw_publisher->topic_name = new_string; auto free_topic_name = rcpputils::make_scope_exit( [rmw_publisher, allocator]() { allocator->deallocate(const_cast(rmw_publisher->topic_name), allocator->state); @@ -650,23 +610,6 @@ rmw_create_publisher( z_undeclare_publisher(z_move(publisher_data->pub)); }); - // Uncomment and rely on #if #endif blocks to enable this feature when building with - // zenoh-pico since liveliness is only available in zenoh-c. - // Publish to the graph that a new publisher is in town - // TODO(Yadunund): Publish liveliness for the new publisher. - // const bool pub_result = PublishToken::put( - // &node->context->impl->session, - // liveliness::GenerateToken::publisher( - // node->context->actual_domain_id, - // node->namespace_, - // node->name, - // rmw_publisher->topic_name, - // publisher_data->type_support->get_name(), - // "reliable") - // ); - // if (!pub_result) { - // return nullptr; - // } const auto liveliness_entity = liveliness::Entity::make( z_info_zid(z_loan(node->context->impl->session)), liveliness::EntityType::Publisher, @@ -736,24 +679,6 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) auto publisher_data = static_cast(publisher->data); if (publisher_data != nullptr) { - // Uncomment and rely on #if #endif blocks to enable this feature when building with - // zenoh-pico since liveliness is only available in zenoh-c. - // Publish to the graph that a publisher has ridden off into the sunset - // const bool del_result = PublishToken::del( - // &node->context->impl->session, - // liveliness::GenerateToken::publisher( - // node->context->actual_domain_id, - // node->namespace_, - // node->name, - // publisher->topic_name, - // publisher_data->type_support->get_name(), - // "reliable" - // ) - // ); - // if (!del_result) { - // // TODO(Yadunund): Should this really return an error? - // return RMW_RET_ERROR; - // } z_drop(z_move(publisher_data->token)); if (publisher_data->pub_cache.has_value()) { z_drop(z_move(publisher_data->pub_cache.value())); @@ -1333,17 +1258,11 @@ rmw_create_subscription( rmw_subscription->implementation_identifier = rmw_zenoh_identifier; rmw_subscription->data = sub_data; - size_t topic_len = strlen(topic_name); - // We specifically don't use rcutils_strdup() here because we want to avoid iterating over the - // name again looking for the \0 (we already did that above). - char * new_string = static_cast(allocator->allocate(topic_len + 1, allocator->state)); + rmw_subscription->topic_name = rcutils_strdup(topic_name, *allocator); RMW_CHECK_FOR_NULL_WITH_MSG( - new_string, + rmw_subscription->topic_name, "Failed to allocate topic name", return nullptr); - memcpy(new_string, topic_name, topic_len); - new_string[topic_len] = '\0'; - rmw_subscription->topic_name = new_string; auto free_topic_name = rcpputils::make_scope_exit( [rmw_subscription, allocator]() { allocator->deallocate(const_cast(rmw_subscription->topic_name), allocator->state); @@ -1659,7 +1578,6 @@ static rmw_ret_t __rmw_take( } *taken = true; - z_drop(&msg_data->payload); // TODO(clalancette): fill in source_timestamp message_info->source_timestamp = 0; @@ -3586,10 +3504,23 @@ rmw_get_gid_for_client(const rmw_client_t * client, rmw_gid_t * gid) rmw_ret_t rmw_compare_gids_equal(const rmw_gid_t * gid1, const rmw_gid_t * gid2, bool * result) { - static_cast(gid1); - static_cast(gid2); - static_cast(result); - return RMW_RET_UNSUPPORTED; + RMW_CHECK_ARGUMENT_FOR_NULL(gid1, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + gid1, + gid1->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(gid2, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + gid2, + gid2->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(result, RMW_RET_INVALID_ARGUMENT); + + *result = memcmp(gid1->data, gid2->data, RMW_GID_STORAGE_SIZE) == 0; + + return RMW_RET_OK; } //==============================================================================