From 08704afd87519988455333dd053c0414cb0c0715 Mon Sep 17 00:00:00 2001 From: Yadu Date: Mon, 24 Jun 2024 11:39:11 -0700 Subject: [PATCH] Include topic type and hash in key expression (#171) * Include topic type in keyexpr Signed-off-by: Yadunund * Include type hash in keyexpr Signed-off-by: Yadunund * Update rmw_zenoh_cpp/src/rmw_zenoh.cpp Co-authored-by: Chris Lalancette Signed-off-by: Yadu * Address feedback Signed-off-by: Yadunund * Update rmw_zenoh_cpp/src/rmw_zenoh.cpp Co-authored-by: Chris Lalancette Signed-off-by: Yadu * Avoid one allocation Signed-off-by: Yadunund --------- Signed-off-by: Yadunund Signed-off-by: Yadu Co-authored-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 6 + rmw_zenoh_cpp/src/rmw_zenoh.cpp | 180 ++++++++++++++++---- 2 files changed, 150 insertions(+), 36 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 1c3c48e2..1ae8b736 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -32,6 +32,8 @@ #include "rmw/rmw.h" +#include "rosidl_runtime_c/type_hash.h" + #include "event.hpp" #include "graph_cache.hpp" #include "message_type_support.hpp" @@ -109,6 +111,7 @@ class rmw_publisher_data_t final // Type support fields const void * type_support_impl; const char * typesupport_identifier; + const rosidl_type_hash_t * type_hash; MessageTypeSupport * type_support; // Context for memory allocation for messages. @@ -174,6 +177,7 @@ class rmw_subscription_data_t final const void * type_support_impl; const char * typesupport_identifier; + const rosidl_type_hash_t * type_hash; MessageTypeSupport * type_support; rmw_context_t * context; @@ -245,6 +249,7 @@ class rmw_service_data_t final const void * request_type_support_impl; const void * response_type_support_impl; const char * typesupport_identifier; + const rosidl_type_hash_t * type_hash; RequestTypeSupport * request_type_support; ResponseTypeSupport * response_type_support; @@ -316,6 +321,7 @@ class rmw_client_data_t final const void * request_type_support_impl; const void * response_type_support_impl; const char * typesupport_identifier; + const rosidl_type_hash_t * type_hash; RequestTypeSupport * request_type_support; ResponseTypeSupport * response_type_support; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index a9d0515b..0356be80 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -69,21 +69,47 @@ namespace { +//============================================================================== +// Helper function to create a copy of a string after removing any +// leading or trailing slashes. +std::string strip_slashes(const char * const str) +{ + std::string ret = std::string(str); + const std::size_t len = strlen(str); + std::size_t start = 0; + std::size_t end = len - 1; + if (str[0] == '/') { + ++start; + } + if (str[end] == '/') { + --end; + } + return ret.substr(start, end - start + 1); +} //============================================================================== -// A function to take ros topic names and convert them to valid Zenoh keys. +// A function that generates a key expression for message transport of the format +// /// // In particular, Zenoh keys cannot start or end with a /, so this function // will strip them out. -// The Zenoh key is also prefixed with the ros_domain_id. // Performance note: at present, this function allocates a new string and copies // the old string into it. If this becomes a performance problem, we could consider // modifying the topic_name in place. But this means we need to be much more // careful about who owns the string. -z_owned_keyexpr_t ros_topic_name_to_zenoh_key(const char * const topic_name, size_t domain_id) -{ - const std::string keyexpr_str = std::to_string(domain_id) + - "/" + - rmw_zenoh_cpp::liveliness::mangle_name(topic_name); +z_owned_keyexpr_t ros_topic_name_to_zenoh_key( + const std::size_t domain_id, + const char * const topic_name, + const char * const topic_type, + const char * const type_hash) +{ + std::string keyexpr_str = std::to_string(domain_id); + keyexpr_str += "/"; + keyexpr_str += strip_slashes(topic_name); + keyexpr_str += "/"; + keyexpr_str += topic_type; + keyexpr_str += "/"; + keyexpr_str += type_hash; + return z_keyexpr_new(keyexpr_str.c_str()); } @@ -535,6 +561,7 @@ rmw_create_publisher( RMW_ZENOH_DEFAULT_HISTORY_DEPTH; publisher_data->typesupport_identifier = type_support->typesupport_identifier; + publisher_data->type_hash = type_support->get_type_hash_func(type_support); publisher_data->type_support_impl = type_support->data; auto callbacks = static_cast(type_support->data); publisher_data->type_support = static_cast( @@ -576,8 +603,27 @@ rmw_create_publisher( allocator->deallocate(const_cast(rmw_publisher->topic_name), allocator->state); }); + // Convert the type hash to a string so that it can be included in + // the keyexpr. + char * type_hash_c_str = nullptr; + rcutils_ret_t stringify_ret = rosidl_stringify_type_hash( + publisher_data->type_hash, + *allocator, + &type_hash_c_str); + if (RCUTILS_RET_BAD_ALLOC == stringify_ret) { + RMW_SET_ERROR_MSG("Failed to allocate type_hash_c_str."); + return nullptr; + } + auto free_type_hash_c_str = rcpputils::make_scope_exit( + [&allocator, &type_hash_c_str]() { + allocator->deallocate(type_hash_c_str, allocator->state); + }); + z_owned_keyexpr_t keyexpr = ros_topic_name_to_zenoh_key( - topic_name, node->context->actual_domain_id); + node->context->actual_domain_id, + topic_name, + publisher_data->type_support->get_name(), + type_hash_c_str); auto always_free_ros_keyexpr = rcpputils::make_scope_exit( [&keyexpr]() { z_keyexpr_drop(z_move(keyexpr)); @@ -1325,6 +1371,7 @@ rmw_create_subscription( RMW_ZENOH_DEFAULT_HISTORY_DEPTH; sub_data->typesupport_identifier = type_support->typesupport_identifier; + sub_data->type_hash = type_support->get_type_hash_func(type_support); sub_data->type_support_impl = type_support->data; auto callbacks = static_cast(type_support->data); sub_data->type_support = static_cast( @@ -1368,12 +1415,30 @@ rmw_create_subscription( rmw_subscription->can_loan_messages = false; rmw_subscription->is_cft_enabled = false; + // Convert the type hash to a string so that it can be included in + // the keyexpr. + char * type_hash_c_str = nullptr; + rcutils_ret_t stringify_ret = rosidl_stringify_type_hash( + sub_data->type_hash, + *allocator, + &type_hash_c_str); + if (RCUTILS_RET_BAD_ALLOC == stringify_ret) { + RMW_SET_ERROR_MSG("Failed to allocate type_hash_c_str."); + return nullptr; + } + auto free_type_hash_c_str = rcpputils::make_scope_exit( + [&allocator, &type_hash_c_str]() { + allocator->deallocate(type_hash_c_str, allocator->state); + }); + // Everything above succeeded and is setup properly. Now declare a subscriber // with Zenoh; after this, callbacks may come in at any time. - z_owned_closure_sample_t callback = z_closure(rmw_zenoh_cpp::sub_data_handler, nullptr, sub_data); z_owned_keyexpr_t keyexpr = ros_topic_name_to_zenoh_key( - topic_name, node->context->actual_domain_id); + node->context->actual_domain_id, + topic_name, + sub_data->type_support->get_name(), + type_hash_c_str); auto always_free_ros_keyexpr = rcpputils::make_scope_exit( [&keyexpr]() { z_keyexpr_drop(z_move(keyexpr)); @@ -2007,6 +2072,7 @@ rmw_create_client( client_data->context = node->context; client_data->typesupport_identifier = type_support->typesupport_identifier; + client_data->type_hash = type_support->get_type_hash_func(type_support); client_data->request_type_support_impl = request_members; client_data->response_type_support_impl = response_members; @@ -2072,17 +2138,6 @@ rmw_create_client( allocator->deallocate(const_cast(rmw_client->service_name), allocator->state); }); - client_data->keyexpr = ros_topic_name_to_zenoh_key( - rmw_client->service_name, node->context->actual_domain_id); - auto free_ros_keyexpr = rcpputils::make_scope_exit( - [client_data]() { - z_keyexpr_drop(z_move(client_data->keyexpr)); - }); - if (!z_keyexpr_check(&client_data->keyexpr)) { - RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); - return nullptr; - } - // Note: Service request/response types will contain a suffix Request_ or Response_. // We remove the suffix when appending the type to the liveliness tokens for // better reusability within GraphCache. @@ -2097,6 +2152,37 @@ rmw_create_client( service_type.c_str(), rmw_client->service_name); return nullptr; } + + // Convert the type hash to a string so that it can be included in + // the keyexpr. + char * type_hash_c_str = nullptr; + rcutils_ret_t stringify_ret = rosidl_stringify_type_hash( + client_data->type_hash, + *allocator, + &type_hash_c_str); + if (RCUTILS_RET_BAD_ALLOC == stringify_ret) { + RMW_SET_ERROR_MSG("Failed to allocate type_hash_c_str."); + return nullptr; + } + auto free_type_hash_c_str = rcpputils::make_scope_exit( + [&allocator, &type_hash_c_str]() { + allocator->deallocate(type_hash_c_str, allocator->state); + }); + + client_data->keyexpr = ros_topic_name_to_zenoh_key( + node->context->actual_domain_id, + rmw_client->service_name, + service_type.c_str(), + type_hash_c_str); + auto free_ros_keyexpr = rcpputils::make_scope_exit( + [client_data]() { + z_keyexpr_drop(z_move(client_data->keyexpr)); + }); + if (!z_keyexpr_check(&client_data->keyexpr)) { + RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); + return nullptr; + } + client_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( z_info_zid(z_loan(node->context->impl->session)), std::to_string(node_data->id), @@ -2551,6 +2637,7 @@ rmw_create_service( service_data->context = node->context; service_data->typesupport_identifier = type_support->typesupport_identifier; + service_data->type_hash = type_support->get_type_hash_func(type_support); service_data->request_type_support_impl = request_members; service_data->response_type_support_impl = response_members; @@ -2611,8 +2698,43 @@ rmw_create_service( [rmw_service, allocator]() { allocator->deallocate(const_cast(rmw_service->service_name), allocator->state); }); + + // Note: Service request/response types will contain a suffix Request_ or Response_. + // We remove the suffix when appending the type to the liveliness tokens for + // better reusability within GraphCache. + std::string service_type = service_data->response_type_support->get_name(); + size_t suffix_substring_position = service_type.find("Response_"); + if (std::string::npos != suffix_substring_position) { + service_type = service_type.substr(0, suffix_substring_position); + } else { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unexpected type %s for service %s. Report this bug", + service_type.c_str(), rmw_service->service_name); + return nullptr; + } + + // Convert the type hash to a string so that it can be included in + // the keyexpr. + char * type_hash_c_str = nullptr; + rcutils_ret_t stringify_ret = rosidl_stringify_type_hash( + service_data->type_hash, + *allocator, + &type_hash_c_str); + if (RCUTILS_RET_BAD_ALLOC == stringify_ret) { + RMW_SET_ERROR_MSG("Failed to allocate type_hash_c_str."); + return nullptr; + } + auto free_type_hash_c_str = rcpputils::make_scope_exit( + [&allocator, &type_hash_c_str]() { + allocator->deallocate(type_hash_c_str, allocator->state); + }); + service_data->keyexpr = ros_topic_name_to_zenoh_key( - rmw_service->service_name, node->context->actual_domain_id); + node->context->actual_domain_id, + rmw_service->service_name, + service_type.c_str(), + type_hash_c_str); auto free_ros_keyexpr = rcpputils::make_scope_exit( [service_data]() { if (service_data) { @@ -2645,20 +2767,6 @@ rmw_create_service( z_undeclare_queryable(z_move(service_data->qable)); }); - // Note: Service request/response types will contain a suffix Request_ or Response_. - // We remove the suffix when appending the type to the liveliness tokens for - // better reusability within GraphCache. - std::string service_type = service_data->response_type_support->get_name(); - size_t suffix_substring_position = service_type.find("Response_"); - if (std::string::npos != suffix_substring_position) { - service_type = service_type.substr(0, suffix_substring_position); - } else { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unexpected type %s for service %s. Report this bug", - service_type.c_str(), rmw_service->service_name); - return nullptr; - } service_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( z_info_zid(z_loan(node->context->impl->session)), std::to_string(node_data->id),