From 2d4bac73ed8e3c1f25dd9a9b2c6a18b3bc8b8021 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Fri, 16 Feb 2024 05:18:00 -0500 Subject: [PATCH 1/3] More cleanups around the code. (#109) * Use a keyexpression slice. That way we don't have to do an allocation here. Signed-off-by: Chris Lalancette * Cleanup graph_sub_data_handler. Get rid of some unnecessary operations, and be sure to always drop the keyexpr, even on error. Signed-off-by: Chris Lalancette * Remove unnecessary comment. Signed-off-by: Chris Lalancette * Remove unnecessary malloc/memcpy/free. Signed-off-by: Chris Lalancette * Switch to z_undeclare_queryable. It's just the counterpart to z_declare_queryable. Signed-off-by: Chris Lalancette --------- Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/rmw_init.cpp | 22 ++++++++-------------- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 32 ++++++++++---------------------- 2 files changed, 18 insertions(+), 36 deletions(-) diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 85debdec..5e0310a4 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -45,8 +45,13 @@ static void graph_sub_data_handler( const z_sample_t * sample, void * data) { - (void)data; + static_cast(data); + z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); + auto free_keystr = rcpputils::make_scope_exit( + [&keystr]() { + z_drop(z_move(keystr)); + }); // Get the context impl from data. rmw_context_impl_s * context_impl = static_cast( @@ -59,21 +64,16 @@ static void graph_sub_data_handler( return; } - // TODO(Yadunund): Avoid this copy. - std::string keyexpr_str(keystr._cstr); - switch (sample->kind) { case z_sample_kind_t::Z_SAMPLE_KIND_PUT: - context_impl->graph_cache.parse_put(keyexpr_str); + context_impl->graph_cache.parse_put(keystr._cstr); break; case z_sample_kind_t::Z_SAMPLE_KIND_DELETE: - context_impl->graph_cache.parse_del(keyexpr_str); + context_impl->graph_cache.parse_del(keystr._cstr); break; default: break; } - - z_drop(z_move(keystr)); } //============================================================================== @@ -269,12 +269,6 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) zc_liveliness_get( z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), z_move(channel.send), NULL); - // Uncomment and rely on #if #endif blocks to enable this feature when building with - // zenoh-pico since liveliness is only available in zenoh-c. - // z_get_options_t opts = z_get_options_default(); - // z_get( - // z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), "", z_move(channel.send), - // &opts); // here, the send is moved and will be dropped by zenoh when adequate z_owned_reply_t reply = z_reply_null(); for (bool call_success = z_call(channel.recv, &reply); !call_success || z_check(reply); call_success = z_call(channel.recv, &reply)) diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 2ffd5600..5b2f19b3 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -78,8 +78,7 @@ namespace // the old string into it. If this becomes a performance problem, we could consider // modifying the topic_name in place. But this means we need to be much more // careful about who owns the string. -z_owned_keyexpr_t ros_topic_name_to_zenoh_key( - const char * const topic_name, size_t domain_id, rcutils_allocator_t * allocator) +z_owned_keyexpr_t ros_topic_name_to_zenoh_key(const char * const topic_name, size_t domain_id) { std::string d = std::to_string(domain_id); @@ -98,17 +97,9 @@ z_owned_keyexpr_t ros_topic_name_to_zenoh_key( } } - char * stripped_topic_name = rcutils_strndup( - &topic_name[start_offset], end_offset - start_offset, *allocator); - if (stripped_topic_name == nullptr) { - return z_keyexpr_null(); - } - - z_owned_keyexpr_t ret = z_keyexpr_join(z_keyexpr(d.c_str()), z_keyexpr(stripped_topic_name)); - - allocator->deallocate(stripped_topic_name, allocator->state); - - return ret; + return z_keyexpr_join( + z_keyexpr(d.c_str()), + zc_keyexpr_from_slice(&topic_name[start_offset], end_offset - start_offset)); } //============================================================================== @@ -556,7 +547,7 @@ rmw_create_publisher( }); z_owned_keyexpr_t keyexpr = ros_topic_name_to_zenoh_key( - topic_name, node->context->actual_domain_id, allocator); + topic_name, node->context->actual_domain_id); auto always_free_ros_keyexpr = rcpputils::make_scope_exit( [&keyexpr]() { z_keyexpr_drop(z_move(keyexpr)); @@ -1277,7 +1268,7 @@ rmw_create_subscription( z_owned_closure_sample_t callback = z_closure(sub_data_handler, nullptr, sub_data); z_owned_keyexpr_t keyexpr = ros_topic_name_to_zenoh_key( - topic_name, node->context->actual_domain_id, allocator); + topic_name, node->context->actual_domain_id); auto always_free_ros_keyexpr = rcpputils::make_scope_exit( [&keyexpr]() { z_keyexpr_drop(z_move(keyexpr)); @@ -1911,7 +1902,7 @@ rmw_create_client( }); client_data->keyexpr = ros_topic_name_to_zenoh_key( - rmw_client->service_name, node->context->actual_domain_id, allocator); + rmw_client->service_name, node->context->actual_domain_id); auto free_ros_keyexpr = rcpputils::make_scope_exit( [client_data]() { z_keyexpr_drop(z_move(client_data->keyexpr)); @@ -2064,13 +2055,10 @@ static z_owned_bytes_map_t create_map_and_set_sequence_num( z_bytes_t guid_bytes; guid_bytes.len = RMW_GID_STORAGE_SIZE; - guid_bytes.start = static_cast(malloc(RMW_GID_STORAGE_SIZE)); - memcpy(static_cast(const_cast(guid_bytes.start)), guid, RMW_GID_STORAGE_SIZE); + guid_bytes.start = guid; z_bytes_map_insert_by_copy(&map, z_bytes_new("client_guid"), guid_bytes); - free(const_cast(guid_bytes.start)); - free_attachment_map.cancel(); return map; @@ -2550,7 +2538,7 @@ rmw_create_service( allocator->deallocate(const_cast(rmw_service->service_name), allocator->state); }); service_data->keyexpr = ros_topic_name_to_zenoh_key( - rmw_service->service_name, node->context->actual_domain_id, allocator); + rmw_service->service_name, node->context->actual_domain_id); auto free_ros_keyexpr = rcpputils::make_scope_exit( [service_data]() { if (service_data) { @@ -2673,7 +2661,7 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) // CLEANUP ================================================================ z_drop(z_move(service_data->keyexpr)); - z_drop(z_move(service_data->qable)); + z_undeclare_queryable(z_move(service_data->qable)); z_drop(z_move(service_data->token)); RMW_TRY_DESTRUCTOR( From e77082fc1240a2a0a547ecd7745a733a1924c299 Mon Sep 17 00:00:00 2001 From: Yadu Date: Mon, 19 Feb 2024 13:04:55 +0800 Subject: [PATCH 2/3] Listen on all connections to 7447 port (#112) Signed-off-by: Yadunund --- rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 index cc4825bc..c08213e6 100644 --- a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 +++ b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 @@ -24,7 +24,7 @@ /// peers, or client can use to establish a zenoh session. listen: { endpoints: [ - "tcp/localhost:7447" + "tcp/[::]:7447" ], }, /// Configure the scouting mechanisms and their behaviours From 08f5c302e3828e8ab73346e36cb0c5d3fef232ee Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Mon, 19 Feb 2024 00:23:41 -0500 Subject: [PATCH 3/3] Implement __rmw_take_serialized. (#113) * Implement __rmw_take_serialized. This allows us to implement rmw_take_serialized_message and rmw_take_serialize_message_with_info, both of which are required for rosbag2. Note that this implementation copies some parts of __rmw_take. Given the different types involved (void * ros_message vs. rmw_serialize_message_t *), I didn't see a readable way to reduce this duplication. Signed-off-by: Chris Lalancette * Style Signed-off-by: Yadunund --------- Signed-off-by: Chris Lalancette Signed-off-by: Yadunund Co-authored-by: Yadunund --- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 78 +++++++++++++++++++++++++++++---- 1 file changed, 69 insertions(+), 9 deletions(-) diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 5b2f19b3..a2863270 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1634,6 +1634,69 @@ rmw_take_sequence( return RMW_RET_UNSUPPORTED; } +//============================================================================== +static rmw_ret_t __rmw_take_serialized( + const rmw_subscription_t * subscription, + rmw_serialized_message_t * serialized_message, + bool * taken, + rmw_message_info_t * message_info) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(subscription->topic_name, RMW_RET_ERROR); + RMW_CHECK_ARGUMENT_FOR_NULL(subscription->data, RMW_RET_ERROR); + RMW_CHECK_ARGUMENT_FOR_NULL(serialized_message, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(message_info, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + subscription handle, + subscription->implementation_identifier, rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + + *taken = false; + + auto sub_data = static_cast(subscription->data); + RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); + + if (sub_data->context->impl->is_shutdown) { + return RMW_RET_OK; + } + + // RETRIEVE SERIALIZED MESSAGE =============================================== + + std::unique_ptr msg_data = sub_data->pop_next_message(); + if (msg_data == nullptr) { + // This tells rcl that the check for a new message was done, but no messages have come in yet. + return RMW_RET_OK; + } + + if (serialized_message->buffer_capacity < msg_data->payload.payload.len) { + rmw_ret_t ret = + rmw_serialized_message_resize(serialized_message, msg_data->payload.payload.len); + if (ret != RMW_RET_OK) { + return ret; // Error message already set + } + } + serialized_message->buffer_length = msg_data->payload.payload.len; + memcpy( + serialized_message->buffer, msg_data->payload.payload.start, + msg_data->payload.payload.len); + + *taken = true; + + // TODO(clalancette): fill in source_timestamp + message_info->source_timestamp = 0; + message_info->received_timestamp = msg_data->recv_timestamp; + // TODO(clalancette): fill in publication_sequence_number + message_info->publication_sequence_number = 0; + // TODO(clalancette): fill in reception_sequence_number + message_info->reception_sequence_number = 0; + message_info->publisher_gid.implementation_identifier = rmw_zenoh_identifier; + memcpy(message_info->publisher_gid.data, msg_data->publisher_gid, 16); + message_info->from_intra_process = false; + + return RMW_RET_OK; +} + //============================================================================== /// Take an incoming ROS message as a byte stream. rmw_ret_t @@ -1643,11 +1706,11 @@ rmw_take_serialized_message( bool * taken, rmw_subscription_allocation_t * allocation) { - static_cast(subscription); - static_cast(serialized_message); - static_cast(taken); static_cast(allocation); - return RMW_RET_UNSUPPORTED; + + rmw_message_info_t dummy_msg_info; + + return __rmw_take_serialized(subscription, serialized_message, taken, &dummy_msg_info); } //============================================================================== @@ -1660,12 +1723,9 @@ rmw_take_serialized_message_with_info( rmw_message_info_t * message_info, rmw_subscription_allocation_t * allocation) { - static_cast(subscription); - static_cast(serialized_message); - static_cast(taken); - static_cast(message_info); static_cast(allocation); - return RMW_RET_UNSUPPORTED; + + return __rmw_take_serialized(subscription, serialized_message, taken, message_info); } //==============================================================================