Skip to content

Commit

Permalink
Include topic type and hash in key expression (#171)
Browse files Browse the repository at this point in the history
* Include topic type in keyexpr

Signed-off-by: Yadunund <[email protected]>

* Include type hash in keyexpr

Signed-off-by: Yadunund <[email protected]>

* Update rmw_zenoh_cpp/src/rmw_zenoh.cpp

Co-authored-by: Chris Lalancette <[email protected]>
Signed-off-by: Yadu <[email protected]>

* Address feedback

Signed-off-by: Yadunund <[email protected]>

* Update rmw_zenoh_cpp/src/rmw_zenoh.cpp

Co-authored-by: Chris Lalancette <[email protected]>
Signed-off-by: Yadu <[email protected]>

* Avoid one allocation

Signed-off-by: Yadunund <[email protected]>

---------

Signed-off-by: Yadunund <[email protected]>
Signed-off-by: Yadu <[email protected]>
Co-authored-by: Chris Lalancette <[email protected]>
  • Loading branch information
Yadunund and clalancette authored Jun 24, 2024
1 parent b48713a commit 08704af
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 36 deletions.
6 changes: 6 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

#include "rmw/rmw.h"

#include "rosidl_runtime_c/type_hash.h"

#include "event.hpp"
#include "graph_cache.hpp"
#include "message_type_support.hpp"
Expand Down Expand Up @@ -109,6 +111,7 @@ class rmw_publisher_data_t final
// Type support fields
const void * type_support_impl;
const char * typesupport_identifier;
const rosidl_type_hash_t * type_hash;
MessageTypeSupport * type_support;

// Context for memory allocation for messages.
Expand Down Expand Up @@ -174,6 +177,7 @@ class rmw_subscription_data_t final

const void * type_support_impl;
const char * typesupport_identifier;
const rosidl_type_hash_t * type_hash;
MessageTypeSupport * type_support;
rmw_context_t * context;

Expand Down Expand Up @@ -245,6 +249,7 @@ class rmw_service_data_t final
const void * request_type_support_impl;
const void * response_type_support_impl;
const char * typesupport_identifier;
const rosidl_type_hash_t * type_hash;
RequestTypeSupport * request_type_support;
ResponseTypeSupport * response_type_support;

Expand Down Expand Up @@ -316,6 +321,7 @@ class rmw_client_data_t final
const void * request_type_support_impl;
const void * response_type_support_impl;
const char * typesupport_identifier;
const rosidl_type_hash_t * type_hash;
RequestTypeSupport * request_type_support;
ResponseTypeSupport * response_type_support;

Expand Down
180 changes: 144 additions & 36 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,47 @@

namespace
{
//==============================================================================
// Helper function to create a copy of a string after removing any
// leading or trailing slashes.
std::string strip_slashes(const char * const str)
{
std::string ret = std::string(str);
const std::size_t len = strlen(str);
std::size_t start = 0;
std::size_t end = len - 1;
if (str[0] == '/') {
++start;
}
if (str[end] == '/') {
--end;
}
return ret.substr(start, end - start + 1);
}

//==============================================================================
// A function to take ros topic names and convert them to valid Zenoh keys.
// A function that generates a key expression for message transport of the format
// <ros_domain_id>/<topic_name>/<topic_type>/<topic_hash>
// In particular, Zenoh keys cannot start or end with a /, so this function
// will strip them out.
// The Zenoh key is also prefixed with the ros_domain_id.
// Performance note: at present, this function allocates a new string and copies
// the old string into it. If this becomes a performance problem, we could consider
// modifying the topic_name in place. But this means we need to be much more
// careful about who owns the string.
z_owned_keyexpr_t ros_topic_name_to_zenoh_key(const char * const topic_name, size_t domain_id)
{
const std::string keyexpr_str = std::to_string(domain_id) +
"/" +
rmw_zenoh_cpp::liveliness::mangle_name(topic_name);
z_owned_keyexpr_t ros_topic_name_to_zenoh_key(
const std::size_t domain_id,
const char * const topic_name,
const char * const topic_type,
const char * const type_hash)
{
std::string keyexpr_str = std::to_string(domain_id);
keyexpr_str += "/";
keyexpr_str += strip_slashes(topic_name);
keyexpr_str += "/";
keyexpr_str += topic_type;
keyexpr_str += "/";
keyexpr_str += type_hash;

return z_keyexpr_new(keyexpr_str.c_str());
}

Expand Down Expand Up @@ -535,6 +561,7 @@ rmw_create_publisher(
RMW_ZENOH_DEFAULT_HISTORY_DEPTH;

publisher_data->typesupport_identifier = type_support->typesupport_identifier;
publisher_data->type_hash = type_support->get_type_hash_func(type_support);
publisher_data->type_support_impl = type_support->data;
auto callbacks = static_cast<const message_type_support_callbacks_t *>(type_support->data);
publisher_data->type_support = static_cast<rmw_zenoh_cpp::MessageTypeSupport *>(
Expand Down Expand Up @@ -576,8 +603,27 @@ rmw_create_publisher(
allocator->deallocate(const_cast<char *>(rmw_publisher->topic_name), allocator->state);
});

// Convert the type hash to a string so that it can be included in
// the keyexpr.
char * type_hash_c_str = nullptr;
rcutils_ret_t stringify_ret = rosidl_stringify_type_hash(
publisher_data->type_hash,
*allocator,
&type_hash_c_str);
if (RCUTILS_RET_BAD_ALLOC == stringify_ret) {
RMW_SET_ERROR_MSG("Failed to allocate type_hash_c_str.");
return nullptr;
}
auto free_type_hash_c_str = rcpputils::make_scope_exit(
[&allocator, &type_hash_c_str]() {
allocator->deallocate(type_hash_c_str, allocator->state);
});

z_owned_keyexpr_t keyexpr = ros_topic_name_to_zenoh_key(
topic_name, node->context->actual_domain_id);
node->context->actual_domain_id,
topic_name,
publisher_data->type_support->get_name(),
type_hash_c_str);
auto always_free_ros_keyexpr = rcpputils::make_scope_exit(
[&keyexpr]() {
z_keyexpr_drop(z_move(keyexpr));
Expand Down Expand Up @@ -1325,6 +1371,7 @@ rmw_create_subscription(
RMW_ZENOH_DEFAULT_HISTORY_DEPTH;

sub_data->typesupport_identifier = type_support->typesupport_identifier;
sub_data->type_hash = type_support->get_type_hash_func(type_support);
sub_data->type_support_impl = type_support->data;
auto callbacks = static_cast<const message_type_support_callbacks_t *>(type_support->data);
sub_data->type_support = static_cast<rmw_zenoh_cpp::MessageTypeSupport *>(
Expand Down Expand Up @@ -1368,12 +1415,30 @@ rmw_create_subscription(
rmw_subscription->can_loan_messages = false;
rmw_subscription->is_cft_enabled = false;

// Convert the type hash to a string so that it can be included in
// the keyexpr.
char * type_hash_c_str = nullptr;
rcutils_ret_t stringify_ret = rosidl_stringify_type_hash(
sub_data->type_hash,
*allocator,
&type_hash_c_str);
if (RCUTILS_RET_BAD_ALLOC == stringify_ret) {
RMW_SET_ERROR_MSG("Failed to allocate type_hash_c_str.");
return nullptr;
}
auto free_type_hash_c_str = rcpputils::make_scope_exit(
[&allocator, &type_hash_c_str]() {
allocator->deallocate(type_hash_c_str, allocator->state);
});

// Everything above succeeded and is setup properly. Now declare a subscriber
// with Zenoh; after this, callbacks may come in at any time.

z_owned_closure_sample_t callback = z_closure(rmw_zenoh_cpp::sub_data_handler, nullptr, sub_data);
z_owned_keyexpr_t keyexpr = ros_topic_name_to_zenoh_key(
topic_name, node->context->actual_domain_id);
node->context->actual_domain_id,
topic_name,
sub_data->type_support->get_name(),
type_hash_c_str);
auto always_free_ros_keyexpr = rcpputils::make_scope_exit(
[&keyexpr]() {
z_keyexpr_drop(z_move(keyexpr));
Expand Down Expand Up @@ -2007,6 +2072,7 @@ rmw_create_client(

client_data->context = node->context;
client_data->typesupport_identifier = type_support->typesupport_identifier;
client_data->type_hash = type_support->get_type_hash_func(type_support);
client_data->request_type_support_impl = request_members;
client_data->response_type_support_impl = response_members;

Expand Down Expand Up @@ -2072,17 +2138,6 @@ rmw_create_client(
allocator->deallocate(const_cast<char *>(rmw_client->service_name), allocator->state);
});

client_data->keyexpr = ros_topic_name_to_zenoh_key(
rmw_client->service_name, node->context->actual_domain_id);
auto free_ros_keyexpr = rcpputils::make_scope_exit(
[client_data]() {
z_keyexpr_drop(z_move(client_data->keyexpr));
});
if (!z_keyexpr_check(&client_data->keyexpr)) {
RMW_SET_ERROR_MSG("unable to create zenoh keyexpr.");
return nullptr;
}

// Note: Service request/response types will contain a suffix Request_ or Response_.
// We remove the suffix when appending the type to the liveliness tokens for
// better reusability within GraphCache.
Expand All @@ -2097,6 +2152,37 @@ rmw_create_client(
service_type.c_str(), rmw_client->service_name);
return nullptr;
}

// Convert the type hash to a string so that it can be included in
// the keyexpr.
char * type_hash_c_str = nullptr;
rcutils_ret_t stringify_ret = rosidl_stringify_type_hash(
client_data->type_hash,
*allocator,
&type_hash_c_str);
if (RCUTILS_RET_BAD_ALLOC == stringify_ret) {
RMW_SET_ERROR_MSG("Failed to allocate type_hash_c_str.");
return nullptr;
}
auto free_type_hash_c_str = rcpputils::make_scope_exit(
[&allocator, &type_hash_c_str]() {
allocator->deallocate(type_hash_c_str, allocator->state);
});

client_data->keyexpr = ros_topic_name_to_zenoh_key(
node->context->actual_domain_id,
rmw_client->service_name,
service_type.c_str(),
type_hash_c_str);
auto free_ros_keyexpr = rcpputils::make_scope_exit(
[client_data]() {
z_keyexpr_drop(z_move(client_data->keyexpr));
});
if (!z_keyexpr_check(&client_data->keyexpr)) {
RMW_SET_ERROR_MSG("unable to create zenoh keyexpr.");
return nullptr;
}

client_data->entity = rmw_zenoh_cpp::liveliness::Entity::make(
z_info_zid(z_loan(node->context->impl->session)),
std::to_string(node_data->id),
Expand Down Expand Up @@ -2551,6 +2637,7 @@ rmw_create_service(

service_data->context = node->context;
service_data->typesupport_identifier = type_support->typesupport_identifier;
service_data->type_hash = type_support->get_type_hash_func(type_support);
service_data->request_type_support_impl = request_members;
service_data->response_type_support_impl = response_members;

Expand Down Expand Up @@ -2611,8 +2698,43 @@ rmw_create_service(
[rmw_service, allocator]() {
allocator->deallocate(const_cast<char *>(rmw_service->service_name), allocator->state);
});

// Note: Service request/response types will contain a suffix Request_ or Response_.
// We remove the suffix when appending the type to the liveliness tokens for
// better reusability within GraphCache.
std::string service_type = service_data->response_type_support->get_name();
size_t suffix_substring_position = service_type.find("Response_");
if (std::string::npos != suffix_substring_position) {
service_type = service_type.substr(0, suffix_substring_position);
} else {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unexpected type %s for service %s. Report this bug",
service_type.c_str(), rmw_service->service_name);
return nullptr;
}

// Convert the type hash to a string so that it can be included in
// the keyexpr.
char * type_hash_c_str = nullptr;
rcutils_ret_t stringify_ret = rosidl_stringify_type_hash(
service_data->type_hash,
*allocator,
&type_hash_c_str);
if (RCUTILS_RET_BAD_ALLOC == stringify_ret) {
RMW_SET_ERROR_MSG("Failed to allocate type_hash_c_str.");
return nullptr;
}
auto free_type_hash_c_str = rcpputils::make_scope_exit(
[&allocator, &type_hash_c_str]() {
allocator->deallocate(type_hash_c_str, allocator->state);
});

service_data->keyexpr = ros_topic_name_to_zenoh_key(
rmw_service->service_name, node->context->actual_domain_id);
node->context->actual_domain_id,
rmw_service->service_name,
service_type.c_str(),
type_hash_c_str);
auto free_ros_keyexpr = rcpputils::make_scope_exit(
[service_data]() {
if (service_data) {
Expand Down Expand Up @@ -2645,20 +2767,6 @@ rmw_create_service(
z_undeclare_queryable(z_move(service_data->qable));
});

// Note: Service request/response types will contain a suffix Request_ or Response_.
// We remove the suffix when appending the type to the liveliness tokens for
// better reusability within GraphCache.
std::string service_type = service_data->response_type_support->get_name();
size_t suffix_substring_position = service_type.find("Response_");
if (std::string::npos != suffix_substring_position) {
service_type = service_type.substr(0, suffix_substring_position);
} else {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unexpected type %s for service %s. Report this bug",
service_type.c_str(), rmw_service->service_name);
return nullptr;
}
service_data->entity = rmw_zenoh_cpp::liveliness::Entity::make(
z_info_zid(z_loan(node->context->impl->session)),
std::to_string(node_data->id),
Expand Down

0 comments on commit 08704af

Please sign in to comment.