From 3b6118ba3f3dcf8559bec989642708b402e95bda Mon Sep 17 00:00:00 2001 From: Yadunund Date: Wed, 27 Dec 2023 17:58:11 +0800 Subject: [PATCH 1/5] Rely on channels for sending requests Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 124 +++++++-------- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 11 +- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 159 ++++++++++++-------- 3 files changed, 166 insertions(+), 128 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 9e795251..718f22d8 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -124,65 +124,65 @@ void service_data_handler(const z_query_t * query, void * service_data) z_drop(z_move(keystr)); } -void client_data_handler(z_owned_reply_t * reply, void * client_data) -{ - auto rmw_client_data = static_cast(client_data); - if (rmw_client_data == nullptr) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain rmw_client_data_t " - ); - return; - } - RCUTILS_LOG_INFO_NAMED( - "rmw_zenoh_cpp", - "[client_data_handler] triggered for %s", - rmw_client_data->service_name - ); - if (!z_check(*reply)) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "z_check returned False" - ); - return; - } - if (!z_reply_check(reply)) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "z_reply_check returned False" - ); - return; - } - if (!z_reply_is_ok(reply)) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "z_reply_is_ok returned False" - ); - return; - } - - z_sample_t sample = z_reply_ok(reply); - - z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); - - RCUTILS_LOG_DEBUG_NAMED( - "rmw_zenoh_cpp", - "[client_data_handler] keyexpr of sample: %s", - z_loan(keystr) - ); - - { - std::lock_guard msg_lock(rmw_client_data->message_mutex); - rmw_client_data->message = std::make_unique( - zc_sample_payload_rcinc(&sample), sample.timestamp.time, sample.timestamp.id.id); - } - { - std::lock_guard internal_lock(rmw_client_data->internal_mutex); - if (rmw_client_data->condition != nullptr) { - rmw_client_data->condition->notify_one(); - } - } - - z_reply_drop(reply); - z_drop(z_move(keystr)); -} +// void client_data_handler(z_owned_reply_t * reply, void * client_data) +// { +// auto rmw_client_data = static_cast(client_data); +// if (rmw_client_data == nullptr) { +// RCUTILS_LOG_ERROR_NAMED( +// "rmw_zenoh_cpp", +// "Unable to obtain rmw_client_data_t " +// ); +// return; +// } +// RCUTILS_LOG_INFO_NAMED( +// "rmw_zenoh_cpp", +// "[client_data_handler] triggered for %s", +// rmw_client_data->service_name +// ); +// if (!z_check(*reply)) { +// RCUTILS_LOG_ERROR_NAMED( +// "rmw_zenoh_cpp", +// "z_check returned False" +// ); +// return; +// } +// if (!z_reply_check(reply)) { +// RCUTILS_LOG_ERROR_NAMED( +// "rmw_zenoh_cpp", +// "z_reply_check returned False" +// ); +// return; +// } +// if (!z_reply_is_ok(reply)) { +// RCUTILS_LOG_ERROR_NAMED( +// "rmw_zenoh_cpp", +// "z_reply_is_ok returned False" +// ); +// return; +// } + +// z_sample_t sample = z_reply_ok(reply); + +// z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); + +// RCUTILS_LOG_DEBUG_NAMED( +// "rmw_zenoh_cpp", +// "[client_data_handler] keyexpr of sample: %s", +// z_loan(keystr) +// ); + +// { +// std::lock_guard msg_lock(rmw_client_data->message_mutex); +// rmw_client_data->message = std::make_unique( +// zc_sample_payload_rcinc(&sample), sample.timestamp.time, sample.timestamp.id.id); +// } +// { +// std::lock_guard internal_lock(rmw_client_data->internal_mutex); +// if (rmw_client_data->condition != nullptr) { +// rmw_client_data->condition->notify_one(); +// } +// } + +// z_reply_drop(reply); +// z_drop(z_move(keystr)); +// } diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 0450f69c..8dbbf069 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -135,7 +135,7 @@ struct rmw_subscription_data_t // z_owned_closure_query_t void service_data_handler(const z_query_t * query, void * service_data); -void client_data_handler(z_owned_reply_t * reply, void * client_data); +// void client_data_handler(z_owned_reply_t * reply, void * client_data); ///============================================================================== @@ -172,12 +172,15 @@ struct rmw_service_data_t struct rmw_client_data_t { - const char * service_name; + // const char * service_name; + z_owned_keyexpr_t keyexpr; - z_owned_closure_reply_t zn_closure_reply; + // z_owned_closure_reply_t zn_closure_reply; + z_owned_reply_channel_t channel; std::mutex message_mutex; - std::unique_ptr message; + std::vector replies; + // std::unique_ptr message; const void * request_type_support_impl; const void * response_type_support_impl; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 65165f0e..6ddd723b 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1739,6 +1739,13 @@ rmw_create_client( allocator->deallocate(client_data, allocator->state); }); + RMW_TRY_PLACEMENT_NEW(client_data, client_data, return nullptr, rmw_client_data_t); + auto destruct_client_data = rcpputils::make_scope_exit( + [client_data]() { + RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( + client_data->~rmw_client_data_t(), + rmw_client_data_t); + }); // Obtain the type support const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); @@ -1809,43 +1816,39 @@ rmw_create_client( }); // Populate the rmw_client. - rmw_client->data = client_data; rmw_client->implementation_identifier = rmw_zenoh_identifier; - rmw_client->service_name = rcutils_strdup(service_name, *allocator); - RMW_CHECK_FOR_NULL_WITH_MSG( rmw_client->service_name, "failed to allocate client name", return nullptr); - auto free_service_name = rcpputils::make_scope_exit( [rmw_client, allocator]() { allocator->deallocate(const_cast(rmw_client->service_name), allocator->state); }); - // Zenoh implementation for the client - - // TODO(francocipollone): Replace ros_topic_name_to_zenoh_key by service related function. - // If this is enough simply rename the method. - z_owned_keyexpr_t keyexpr = ros_topic_name_to_zenoh_key( + client_data->keyexpr = ros_topic_name_to_zenoh_key( rmw_client->service_name, node->context->actual_domain_id, allocator); - auto always_free_ros_keyexpr = rcpputils::make_scope_exit( - [&keyexpr]() { - z_keyexpr_drop(z_move(keyexpr)); + auto free_ros_keyexpr = rcpputils::make_scope_exit( + [client_data]() { + z_keyexpr_drop(z_move(client_data->keyexpr)); }); - if (!z_keyexpr_check(&keyexpr)) { + if (!z_keyexpr_check(&client_data->keyexpr)) { RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); return nullptr; } + rmw_client->data = client_data; + free_rmw_client.cancel(); free_client_data.cancel(); free_request_type_support.cancel(); destruct_request_type_support.cancel(); free_response_type_support.cancel(); + destruct_client_data.cancel(); destruct_response_type_support.cancel(); free_service_name.cancel(); + free_ros_keyexpr.cancel(); return rmw_client; } @@ -1855,11 +1858,10 @@ rmw_create_client( rmw_ret_t rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) { - RCUTILS_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "[rmw_destroy_client] %s", client->service_name); - // ASSERTIONS ================================================================ RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, node->implementation_identifier, @@ -1873,14 +1875,20 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) rcutils_allocator_t * allocator = &node->context->options.allocator; - auto client_data = static_cast(client->data); + rmw_client_data_t * client_data = static_cast(client->data); RMW_CHECK_FOR_NULL_WITH_MSG( client_data, - "client implementation pointer is null", + "client implementation pointer is null.", return RMW_RET_INVALID_ARGUMENT); // CLEANUP =================================================================== - z_drop(z_move(client_data->zn_closure_reply)); + // z_drop(z_move(client_data->zn_closure_reply)); + z_drop(z_move(client_data->channel)); + z_drop(z_move(client_data->keyexpr)); + for(z_owned_reply_t & reply : client_data->replies) { + z_reply_drop(&reply); + } + client_data->replies.clear(); allocator->deallocate(client_data->request_type_support, allocator->state); allocator->deallocate(client_data->response_type_support, allocator->state); @@ -1889,8 +1897,6 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) allocator->deallocate(const_cast(client->service_name), allocator->state); allocator->deallocate(client, allocator->state); - RCUTILS_LOG_DEBUG_NAMED( - "rmw_zenoh_cpp", "[rmw_destroy_client] %s FINISHED", client->service_name); return RMW_RET_OK; } @@ -1904,24 +1910,22 @@ rmw_send_request( { RCUTILS_LOG_INFO_NAMED( "rmw_zenoh_cpp", "[rmw_send_request]"); - RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(ros_request, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(sequence_id, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_TYPE_IDENTIFIERS_MATCH( client, client->implementation_identifier, rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + rmw_client_data_t * client_data = static_cast(client->data); RMW_CHECK_FOR_NULL_WITH_MSG( - client->data, - "client implementation pointer is null", + client_data, + "Unable to retrieve client_data from client.", RMW_RET_INVALID_ARGUMENT); - auto * client_data = static_cast(client->data); - rmw_context_impl_s * context_impl = static_cast( client_data->context->impl); @@ -1967,24 +1971,51 @@ rmw_send_request( // Send request z_get_options_t opts = z_get_options_default(); + opts.target = Z_QUERY_TARGET_ALL; opts.value.payload = z_bytes_t{data_length, reinterpret_cast(request_bytes)}; opts.value.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL); - z_owned_keyexpr_t keyexpr = ros_topic_name_to_zenoh_key( - client->service_name, client_data->context->actual_domain_id, allocator); - auto always_free_ros_keyexpr = rcpputils::make_scope_exit( - [&keyexpr]() { - z_keyexpr_drop(z_move(keyexpr)); - }); - if (!z_keyexpr_check(&keyexpr)) { - RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); - return RMW_RET_ERROR; - } + // z_owned_keyexpr_t keyexpr = ros_topic_name_to_zenoh_key( + // client->service_name, client_data->context->actual_domain_id, allocator); + // auto always_free_ros_keyexpr = rcpputils::make_scope_exit( + // [&keyexpr]() { + // z_keyexpr_drop(z_move(keyexpr)); + // }); + // if (!z_keyexpr_check(&keyexpr)) { + // RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); + // return RMW_RET_ERROR; + // } + + // client_data->service_name = client->service_name; - client_data->service_name = client->service_name; - client_data->zn_closure_reply = z_closure(client_data_handler, nullptr, client_data); + // client_data->zn_closure_reply = z_closure(client_data_handler, nullptr, client_data); - z_get(z_loan(context_impl->session), z_loan(keyexpr), "", &client_data->zn_closure_reply, &opts); + // z_get(z_loan(context_impl->session), z_loan(keyexpr), "", &client_data->zn_closure_reply, &opts); + + client_data->channel = zc_reply_non_blocking_fifo_new(16); + z_get(z_loan(context_impl->session), z_loan(client_data->keyexpr), "", z_move(client_data->channel.send), + &opts); // here, the send is moved and will be dropped by zenoh when adequate + z_owned_reply_t reply = z_reply_null(); + for (bool call_success = z_call(client_data->channel.recv, &reply); !call_success || z_check(reply); + call_success = z_call(client_data->channel.recv, &reply)) { + if (!call_success) { + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", "[rmw_send_request] call unsuccessful"); + continue; + } + if (z_reply_is_ok(&reply)) { + client_data->replies.push_back(std::move(reply)); + std::lock_guard internal_lock(client_data->internal_mutex); + if (client_data->condition != nullptr) { + client_data->condition->notify_one(); + } + // reply = z_reply_null(); + } else { + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", "[rmw_send_request] z_reply is not ok"); + return RMW_RET_ERROR; + } + } return RMW_RET_OK; } @@ -2002,41 +2033,39 @@ rmw_take_response( RCUTILS_LOG_INFO_NAMED("rmw_zenoh_cpp", "[rmw_take_response]"); RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(ros_response, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_TYPE_IDENTIFIERS_MATCH( client, client->implementation_identifier, rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - RMW_CHECK_FOR_NULL_WITH_MSG( client->service_name, "client has no service name", RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_FOR_NULL_WITH_MSG( - client->data, "client implementation pointer is null", RMW_RET_INVALID_ARGUMENT); - - auto client_data = static_cast(client->data); - - std::unique_ptr msg_data = nullptr; + rmw_client_data_t * client_data = static_cast(client->data); + RMW_CHECK_FOR_NULL_WITH_MSG( + client->data, "Unable to retrieve client_data from client.", RMW_RET_INVALID_ARGUMENT); - { - std::lock_guard lock(client_data->message_mutex); - if (client_data->message == nullptr) { - // TODO(francocipollone): Verify behavior. - RCUTILS_LOG_ERROR_NAMED("rmw_zenoh_cpp", "[rmw_take_response] Response message is empty"); - return RMW_RET_ERROR; - } + z_owned_reply_t * latest_reply = nullptr; - msg_data = std::move(client_data->message); - client_data->message.release(); + std::lock_guard lock(client_data->message_mutex); + if (client_data->replies.empty()) { + // TODO(francocipollone): Verify behavior. + RCUTILS_LOG_ERROR_NAMED("rmw_zenoh_cpp", "[rmw_take_response] Response message is empty"); + return RMW_RET_ERROR; } + latest_reply = &client_data->replies.back(); + // msg_data = std::move(client_data->message); + // client_data->message.release(); + + z_sample_t sample = z_reply_ok(latest_reply); // Object that manages the raw buffer eprosima::fastcdr::FastBuffer fastbuffer( - reinterpret_cast(const_cast(msg_data->payload.payload.start)), - msg_data->payload.payload.len); + reinterpret_cast(const_cast(sample.payload.start)), + sample.payload.len); // Object that serializes the data eprosima::fastcdr::Cdr deser( @@ -2053,7 +2082,12 @@ rmw_take_response( } *taken = true; - zc_payload_drop(&(msg_data->payload)); + + for(z_owned_reply_t & reply : client_data->replies) { + z_reply_drop(&reply); + } + client_data->replies.clear(); + // zc_payload_drop(&(msg_data->payload)); // TODO(francocipollone): Verify request_header information. request_header->request_id.sequence_number = 0; @@ -2807,7 +2841,7 @@ rmw_wait( // Go through each of the clients and attach the wait set condition variable to them. // That way they can wake it up if they are triggered while we are waiting. for (size_t i = 0; i < clients->client_count; ++i) { - auto client_data = static_cast(clients->clients[i]); + rmw_client_data_t * client_data = static_cast(clients->clients[i]); if (client_data != nullptr) { client_data->condition = &wait_set_data->condition_variable; } @@ -2876,12 +2910,13 @@ rmw_wait( if (clients) { // Now detach the condition variable and mutex from each of the clients for (size_t i = 0; i < clients->client_count; ++i) { - auto client_data = static_cast(clients->clients[i]); + rmw_client_data_t * client_data = static_cast(clients->clients[i]); if (client_data != nullptr) { client_data->condition = nullptr; // According to the documentation for rmw_wait in rmw.h, entries in the // array that have *not* been triggered should be set to NULL - if (client_data->message == nullptr) { + if (client_data->replies.empty()) { + printf("client replies are empty!!"); // Setting to nullptr lets rcl know that this client is not ready clients->clients[i] = nullptr; } From 3c24476ae9a17ffb7d95a1722edb4273ab258d52 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Wed, 27 Dec 2023 18:34:09 +0800 Subject: [PATCH 2/5] Revert to callback for client with fixes Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 126 ++++++++++---------- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 6 +- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 62 +++++----- 3 files changed, 99 insertions(+), 95 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 718f22d8..80e45390 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -124,65 +124,67 @@ void service_data_handler(const z_query_t * query, void * service_data) z_drop(z_move(keystr)); } -// void client_data_handler(z_owned_reply_t * reply, void * client_data) -// { -// auto rmw_client_data = static_cast(client_data); -// if (rmw_client_data == nullptr) { -// RCUTILS_LOG_ERROR_NAMED( -// "rmw_zenoh_cpp", -// "Unable to obtain rmw_client_data_t " -// ); -// return; -// } -// RCUTILS_LOG_INFO_NAMED( -// "rmw_zenoh_cpp", -// "[client_data_handler] triggered for %s", -// rmw_client_data->service_name -// ); -// if (!z_check(*reply)) { -// RCUTILS_LOG_ERROR_NAMED( -// "rmw_zenoh_cpp", -// "z_check returned False" -// ); -// return; -// } -// if (!z_reply_check(reply)) { -// RCUTILS_LOG_ERROR_NAMED( -// "rmw_zenoh_cpp", -// "z_reply_check returned False" -// ); -// return; -// } -// if (!z_reply_is_ok(reply)) { -// RCUTILS_LOG_ERROR_NAMED( -// "rmw_zenoh_cpp", -// "z_reply_is_ok returned False" -// ); -// return; -// } - -// z_sample_t sample = z_reply_ok(reply); - -// z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); - -// RCUTILS_LOG_DEBUG_NAMED( -// "rmw_zenoh_cpp", -// "[client_data_handler] keyexpr of sample: %s", -// z_loan(keystr) -// ); - -// { -// std::lock_guard msg_lock(rmw_client_data->message_mutex); -// rmw_client_data->message = std::make_unique( -// zc_sample_payload_rcinc(&sample), sample.timestamp.time, sample.timestamp.id.id); -// } -// { -// std::lock_guard internal_lock(rmw_client_data->internal_mutex); -// if (rmw_client_data->condition != nullptr) { -// rmw_client_data->condition->notify_one(); -// } -// } - -// z_reply_drop(reply); -// z_drop(z_move(keystr)); -// } +void client_data_handler(z_owned_reply_t * reply, void * data) +{ + auto client_data = static_cast(data); + if (client_data == nullptr) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain client_data_t " + ); + return; + } + // RCUTILS_LOG_INFO_NAMED( + // "rmw_zenoh_cpp", + // "[client_data_handler] triggered for %s", + // client_data->service_name + // ); + // if (!z_check(*reply)) { + // RCUTILS_LOG_ERROR_NAMED( + // "rmw_zenoh_cpp", + // "z_check returned False" + // ); + // return; + // } + if (!z_reply_check(reply)) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "z_reply_check returned False" + ); + return; + } + if (!z_reply_is_ok(reply)) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "z_reply_is_ok returned False" + ); + return; + } + + // z_sample_t sample = z_reply_ok(reply); + + // z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); + + // RCUTILS_LOG_DEBUG_NAMED( + // "rmw_zenoh_cpp", + // "[client_data_handler] keyexpr of sample: %s", + // z_loan(keystr) + // ); + + { + std::lock_guard msg_lock(client_data->message_mutex); + // client_data->message = std::make_unique( + // zc_sample_payload_rcinc(&sample), sample.timestamp.time, sample.timestamp.id.id); + // Take ownership of the reply. + client_data->replies.emplace_back(*reply); + *reply = z_reply_null(); + } + { + std::lock_guard internal_lock(client_data->internal_mutex); + if (client_data->condition != nullptr) { + client_data->condition->notify_one(); + } + } + + // z_drop(z_move(keystr)); +} diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 8dbbf069..9e9b8253 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -135,7 +135,7 @@ struct rmw_subscription_data_t // z_owned_closure_query_t void service_data_handler(const z_query_t * query, void * service_data); -// void client_data_handler(z_owned_reply_t * reply, void * client_data); +void client_data_handler(z_owned_reply_t * reply, void * client_data); ///============================================================================== @@ -175,8 +175,8 @@ struct rmw_client_data_t // const char * service_name; z_owned_keyexpr_t keyexpr; - // z_owned_closure_reply_t zn_closure_reply; - z_owned_reply_channel_t channel; + z_owned_closure_reply_t zn_closure_reply; + // z_owned_reply_channel_t channel; std::mutex message_mutex; std::vector replies; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 6ddd723b..445084db 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1882,8 +1882,8 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) return RMW_RET_INVALID_ARGUMENT); // CLEANUP =================================================================== - // z_drop(z_move(client_data->zn_closure_reply)); - z_drop(z_move(client_data->channel)); + z_drop(z_move(client_data->zn_closure_reply)); + // z_drop(z_move(client_data->channel)); z_drop(z_move(client_data->keyexpr)); for(z_owned_reply_t & reply : client_data->replies) { z_reply_drop(&reply); @@ -1988,35 +1988,37 @@ rmw_send_request( // client_data->service_name = client->service_name; - // client_data->zn_closure_reply = z_closure(client_data_handler, nullptr, client_data); - - // z_get(z_loan(context_impl->session), z_loan(keyexpr), "", &client_data->zn_closure_reply, &opts); - - client_data->channel = zc_reply_non_blocking_fifo_new(16); - z_get(z_loan(context_impl->session), z_loan(client_data->keyexpr), "", z_move(client_data->channel.send), - &opts); // here, the send is moved and will be dropped by zenoh when adequate - z_owned_reply_t reply = z_reply_null(); - for (bool call_success = z_call(client_data->channel.recv, &reply); !call_success || z_check(reply); - call_success = z_call(client_data->channel.recv, &reply)) { - if (!call_success) { - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "[rmw_send_request] call unsuccessful"); - continue; - } - if (z_reply_is_ok(&reply)) { - client_data->replies.push_back(std::move(reply)); - std::lock_guard internal_lock(client_data->internal_mutex); - if (client_data->condition != nullptr) { - client_data->condition->notify_one(); - } - // reply = z_reply_null(); - } else { - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "[rmw_send_request] z_reply is not ok"); - return RMW_RET_ERROR; - } - } + client_data->zn_closure_reply = z_closure(client_data_handler, nullptr, client_data); + + z_get(z_loan(context_impl->session), z_loan(client_data->keyexpr), "", &client_data->zn_closure_reply, &opts); + + // z_owned_reply_channel_t channel = zc_reply_non_blocking_fifo_new(16); + // z_get(z_loan(context_impl->session), z_loan(client_data->keyexpr), "", z_move(channel.send), + // &opts); // here, the send is moved and will be dropped by zenoh when adequate + // z_owned_reply_t reply = z_reply_null(); + // for (bool call_success = z_call(channel.recv, &reply); !call_success || z_check(reply); + // call_success = z_call(channel.recv, &reply)) { + // if (!call_success) { + // RCUTILS_LOG_WARN_NAMED( + // "rmw_zenoh_cpp", "[rmw_send_request] call unsuccessful"); + // continue; + // } + // if (z_reply_is_ok(&reply)) { + // client_data->replies.push_back(std::move(reply)); + // // reply = z_reply_null(); + // } else { + // RCUTILS_LOG_WARN_NAMED( + // "rmw_zenoh_cpp", "[rmw_send_request] z_reply is not ok"); + // return RMW_RET_ERROR; + // } + // } + // std::lock_guard internal_lock(client_data->internal_mutex); + // if (client_data->condition != nullptr) { + // client_data->condition->notify_one(); + // } + // z_drop(z_move(channel)); + // z_reply_drop(&reply); return RMW_RET_OK; } From 00cc7c0672cacc56cb01fb6f85d22be85a6d1c9e Mon Sep 17 00:00:00 2001 From: Yadunund Date: Wed, 27 Dec 2023 21:12:56 +0800 Subject: [PATCH 3/5] Cleanup service cb Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 34 ++++--- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 10 +- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 105 +++++++++----------- 3 files changed, 72 insertions(+), 77 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 80e45390..2d4b4a49 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -22,13 +22,13 @@ #include "rmw_data_types.hpp" ///============================================================================== - saved_msg_data::saved_msg_data(zc_owned_payload_t p, uint64_t recv_ts, const uint8_t pub_gid[16]) : payload(p), recv_timestamp(recv_ts) { memcpy(publisher_gid, pub_gid, 16); } +//============================================================================== void sub_data_handler( const z_sample_t * sample, void * data) @@ -79,12 +79,14 @@ void sub_data_handler( } -unsigned int rmw_service_data_t::get_new_uid() +//============================================================================== +std::size_t rmw_service_data_t::get_new_uid() { return client_count++; } -void service_data_handler(const z_query_t * query, void * service_data) +//============================================================================== +void service_data_handler(const z_query_t * query, void * data) { RCUTILS_LOG_INFO_NAMED( "rmw_zenoh_cpp", @@ -92,8 +94,8 @@ void service_data_handler(const z_query_t * query, void * service_data) ); z_owned_str_t keystr = z_keyexpr_to_string(z_query_keyexpr(query)); - auto rmw_service_data = static_cast(service_data); - if (rmw_service_data == nullptr) { + rmw_service_data_t * service_data = static_cast(data); + if (service_data == nullptr) { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to obtain rmw_service_data_t from data for " @@ -106,24 +108,24 @@ void service_data_handler(const z_query_t * query, void * service_data) // Get the query parameters and payload { - std::lock_guard lock(rmw_service_data->query_queue_mutex); - - const unsigned int client_id = rmw_service_data->get_new_uid(); - rmw_service_data->id_query_map.emplace( - std::make_pair(client_id, std::make_unique(z_query_clone(query)))); - rmw_service_data->to_take.push_back(client_id); - - + std::lock_guard lock(service_data->query_queue_mutex); + const std::size_t client_id = service_data->get_new_uid(); + service_data->id_query_map.emplace( + std::make_pair(client_id, z_query_clone(query))); + service_data->to_take.push_back(client_id); + } + { // Since we added new data, trigger the guard condition if it is available - std::lock_guard internal_lock(rmw_service_data->internal_mutex); - if (rmw_service_data->condition != nullptr) { - rmw_service_data->condition->notify_one(); + std::lock_guard internal_lock(service_data->internal_mutex); + if (service_data->condition != nullptr) { + service_data->condition->notify_one(); } } z_drop(z_move(keystr)); } +//============================================================================== void client_data_handler(z_owned_reply_t * reply, void * data) { auto client_data = static_cast(data); diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 9e9b8253..05c2793b 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -142,9 +142,9 @@ void client_data_handler(z_owned_reply_t * reply, void * client_data); struct rmw_service_data_t { - unsigned int get_new_uid(); + std::size_t get_new_uid(); - const char * keyexpr; + z_owned_keyexpr_t keyexpr; z_owned_queryable_t qable; const void * request_type_support_impl; @@ -157,15 +157,15 @@ struct rmw_service_data_t // Map to store the query id and the query. // The query handler is saved as it is needed to answer the query later on. - std::unordered_map> id_query_map; + std::unordered_map id_query_map; // The query id's of the queries that need to be processed. - std::deque to_take; + std::deque to_take; std::mutex query_queue_mutex; std::mutex internal_mutex; std::condition_variable * condition{nullptr}; - unsigned int client_count{}; + std::size_t client_count = 0; }; ///============================================================================== diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 445084db..443a1058 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -2194,7 +2194,6 @@ rmw_create_service( rmw_service, "failed to allocate memory for the service", return nullptr); - auto free_rmw_service = rcpputils::make_scope_exit( [rmw_service, allocator]() { allocator->deallocate(rmw_service, allocator->state); @@ -2243,16 +2242,14 @@ rmw_create_service( // Request type support service_data->request_type_support = static_cast( allocator->allocate(sizeof(RequestTypeSupport), allocator->state)); - RMW_CHECK_FOR_NULL_WITH_MSG( service_data->request_type_support, "Failed to allocate RequestTypeSupport", return nullptr); auto free_request_type_support = rcpputils::make_scope_exit( - [service_data, allocator]() { - allocator->deallocate(service_data->request_type_support, allocator->state); + [request_type_support = service_data->request_type_support, allocator]() { + allocator->deallocate(request_type_support, allocator->state); }); - RMW_TRY_PLACEMENT_NEW( service_data->request_type_support, service_data->request_type_support, @@ -2268,16 +2265,14 @@ rmw_create_service( // Response type support service_data->response_type_support = static_cast( allocator->allocate(sizeof(ResponseTypeSupport), allocator->state)); - RMW_CHECK_FOR_NULL_WITH_MSG( service_data->response_type_support, "Failed to allocate ResponseTypeSupport", return nullptr); auto free_response_type_support = rcpputils::make_scope_exit( - [service_data, allocator]() { - allocator->deallocate(service_data->response_type_support, allocator->state); + [response_type_support = service_data->response_type_support, allocator]() { + allocator->deallocate(response_type_support, allocator->state); }); - RMW_TRY_PLACEMENT_NEW( service_data->response_type_support, service_data->response_type_support, @@ -2291,46 +2286,34 @@ rmw_create_service( }); // Populate the rmw_service. - rmw_service->data = service_data; rmw_service->implementation_identifier = rmw_zenoh_identifier; - rmw_service->service_name = rcutils_strdup(service_name, *allocator); - RMW_CHECK_FOR_NULL_WITH_MSG( rmw_service->service_name, "failed to allocate service name", return nullptr); - auto free_service_name = rcpputils::make_scope_exit( [rmw_service, allocator]() { allocator->deallocate(const_cast(rmw_service->service_name), allocator->state); }); - - // Zenoh implementation for the service - - // TODO(francocipollone): Replace ros_topic_name_to_zenoh_key by service related function. - // If this is enough simply rename the method. - z_owned_keyexpr_t keyexpr = ros_topic_name_to_zenoh_key( + service_data->keyexpr = ros_topic_name_to_zenoh_key( rmw_service->service_name, node->context->actual_domain_id, allocator); - auto always_free_ros_keyexpr = rcpputils::make_scope_exit( - [&keyexpr]() { - z_keyexpr_drop(z_move(keyexpr)); + auto free_ros_keyexpr = rcpputils::make_scope_exit( + [service_data]() { + if (service_data) { + z_drop(z_move(service_data->keyexpr)); + } }); - if (!z_keyexpr_check(&keyexpr)) { + if (!z_check(z_loan(service_data->keyexpr))) { RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); return nullptr; } - service_data->keyexpr = z_keyexpr_to_string(z_loan(keyexpr))._cstr; - - RCUTILS_LOG_INFO_NAMED( - "rmw_zenoh_cpp", "[rmw_create_service] keyexpr: %s", - z_loan(z_keyexpr_to_string(z_loan(keyexpr)))); z_owned_closure_query_t callback = z_closure(service_data_handler, nullptr, service_data); service_data->qable = z_declare_queryable( z_loan(context_impl->session), - z_loan(keyexpr), + z_loan(service_data->keyexpr), z_move(callback), nullptr); @@ -2338,7 +2321,6 @@ rmw_create_service( RMW_SET_ERROR_MSG("unable to create zenoh queryable"); return nullptr; } - auto undeclare_z_queryable = rcpputils::make_scope_exit( [service_data]() { z_undeclare_queryable(z_move(service_data->qable)); @@ -2346,6 +2328,8 @@ rmw_create_service( // TODO(francocipollone): Update graph cache. + rmw_service->data = service_data; + free_rmw_service.cancel(); free_service_data.cancel(); free_service_name.cancel(); @@ -2354,6 +2338,7 @@ rmw_create_service( destruct_response_type_support.cancel(); free_request_type_support.cancel(); free_response_type_support.cancel(); + free_ros_keyexpr.cancel(); undeclare_z_queryable.cancel(); return rmw_service; } @@ -2366,6 +2351,7 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) // ASSERTIONS ================================================================ RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(service->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, node->implementation_identifier, @@ -2379,13 +2365,14 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) rcutils_allocator_t * allocator = &node->context->options.allocator; - auto service_data = static_cast(service->data); + rmw_service_data_t * service_data = static_cast(service->data); RMW_CHECK_FOR_NULL_WITH_MSG( service_data, - "service implementation pointer is null", + "Unable to retrieve service_data from service", return RMW_RET_INVALID_ARGUMENT); // CLEANUP ================================================================ + z_drop(z_move(service_data->keyexpr)); z_drop(z_move(service_data->qable)); for (auto & id_query : service_data->id_query_map) { z_drop(z_move(id_query.second)); @@ -2395,7 +2382,8 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) allocator->deallocate(service_data->response_type_support, allocator->state); allocator->deallocate(service->data, allocator->state); - rmw_service_free(service); + allocator->deallocate(const_cast(service->service_name), allocator->state); + allocator->deallocate(service, allocator->state); // TODO(francocipollone): Update graph cache. return RMW_RET_OK; @@ -2414,6 +2402,7 @@ rmw_take_request( RCUTILS_LOG_INFO_NAMED("rmw_zenoh_cpp", "[rmw_take_request]"); RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(service->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(ros_request, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); @@ -2425,36 +2414,34 @@ rmw_take_request( RMW_CHECK_FOR_NULL_WITH_MSG( service->service_name, "service has no service name", RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_FOR_NULL_WITH_MSG( - service->data, "service implementation pointer is null", RMW_RET_INVALID_ARGUMENT); - auto * service_data = static_cast(service->data); + rmw_service_data_t * service_data = static_cast(service->data); + RMW_CHECK_FOR_NULL_WITH_MSG( + service->data, "Unable to retrieve service_data from service", RMW_RET_INVALID_ARGUMENT); std::unique_lock lock(service_data->query_queue_mutex); - if (service_data->id_query_map.empty()) { // TODO(francocipollone): Verify behavior. RCUTILS_LOG_INFO_NAMED("rmw_zenoh_cpp", "[rmw_take_request] Take id_query_map is empty"); return RMW_RET_OK; } - - const unsigned int query_id = service_data->to_take.back(); - const auto query_ret = service_data->id_query_map.find(query_id); - if (query_ret == service_data->id_query_map.end()) { + const std::size_t query_id = service_data->to_take.back(); + auto query_it = service_data->id_query_map.find(query_id); + if (query_it == service_data->id_query_map.end()) { RMW_SET_ERROR_MSG("Query id not found in id_query_map"); return RMW_RET_ERROR; } - const z_owned_query_t * owned_query_ptr = (*query_ret).second.get(); + // const z_owned_query_t * owned_query_ptr = (*query_it).second.get(); // TODO(francocipollone): Remove the query id from the to_take collection service_data->to_take.pop_back(); service_data->query_queue_mutex.unlock(); // DESERIALIZE MESSAGE ======================================================== - if (!z_query_check(owned_query_ptr)) { - RMW_SET_ERROR_MSG("onwed_query_t contains gravestone, can't deserialize message"); - return RMW_RET_ERROR; - } - const z_query_t z_loaned_query = z_query_loan(owned_query_ptr); + // if (!z_query_check(owned_query_ptr)) { + // RMW_SET_ERROR_MSG("onwed_query_t contains gravestone, can't deserialize message"); + // return RMW_RET_ERROR; + // } + const z_query_t z_loaned_query = z_query_loan(&query_it->second); z_value_t payload_value = z_query_value(&z_loaned_query); // Object that manages the raw buffer @@ -2498,6 +2485,7 @@ rmw_send_response( "rmw_zenoh_cpp", "[rmw_send_response]"); RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(service->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(request_header, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(ros_response, RMW_RET_INVALID_ARGUMENT); @@ -2509,10 +2497,10 @@ rmw_send_response( RMW_CHECK_FOR_NULL_WITH_MSG( service->data, - "service implementation pointer is null", + "Unable to retrieve service_data from service", RMW_RET_INVALID_ARGUMENT); - auto * service_data = static_cast(service->data); + rmw_service_data_t * service_data = static_cast(service->data); rcutils_allocator_t * allocator = &(service_data->context->options.allocator); @@ -2556,19 +2544,24 @@ rmw_send_response( meta_length); // Create the queryable payload - service_data->query_queue_mutex.lock(); - auto owned_query_ptr = service_data->id_query_map[request_header->sequence_number].get(); - service_data->query_queue_mutex.unlock(); + std::lock_guard lock(service_data->query_queue_mutex); + auto query_it = service_data->id_query_map.find(request_header->sequence_number); + if (query_it == service_data->id_query_map.end()) { + RMW_SET_ERROR_MSG("Unable to find taken request. Report this bug."); + return RMW_RET_ERROR; + } + const z_query_t z_loaned_query = z_query_loan(&query_it->second); z_query_reply_options_t options = z_query_reply_options_default(); options.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL); - const z_query_t loaned_query = z_loan(*owned_query_ptr); - const z_query_t * query_ptr = &loaned_query; + // const z_query_t loaned_query = z_loan(*owned_query_ptr); + // const z_query_t * query_ptr = &loaned_query; z_query_reply( - query_ptr, z_query_keyexpr(query_ptr), reinterpret_cast( + &z_loaned_query, z_query_keyexpr(&z_loaned_query), reinterpret_cast( response_bytes), data_length + meta_length, &options); - z_drop(z_move(*owned_query_ptr)); + z_drop(z_move(query_it->second)); + service_data->id_query_map.erase(query_it); return RMW_RET_OK; } From f50f8f3f8e70f0de2dd0f91c9fe535a7672ef576 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Wed, 27 Dec 2023 21:13:28 +0800 Subject: [PATCH 4/5] Style Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 443a1058..ab267eee 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1885,7 +1885,7 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) z_drop(z_move(client_data->zn_closure_reply)); // z_drop(z_move(client_data->channel)); z_drop(z_move(client_data->keyexpr)); - for(z_owned_reply_t & reply : client_data->replies) { + for (z_owned_reply_t & reply : client_data->replies) { z_reply_drop(&reply); } client_data->replies.clear(); @@ -1990,7 +1990,9 @@ rmw_send_request( client_data->zn_closure_reply = z_closure(client_data_handler, nullptr, client_data); - z_get(z_loan(context_impl->session), z_loan(client_data->keyexpr), "", &client_data->zn_closure_reply, &opts); + z_get( + z_loan(context_impl->session), z_loan( + client_data->keyexpr), "", &client_data->zn_closure_reply, &opts); // z_owned_reply_channel_t channel = zc_reply_non_blocking_fifo_new(16); // z_get(z_loan(context_impl->session), z_loan(client_data->keyexpr), "", z_move(channel.send), @@ -2085,7 +2087,7 @@ rmw_take_response( *taken = true; - for(z_owned_reply_t & reply : client_data->replies) { + for (z_owned_reply_t & reply : client_data->replies) { z_reply_drop(&reply); } client_data->replies.clear(); From cbb8e16025e828960359920189d44ca99593036e Mon Sep 17 00:00:00 2001 From: Yadunund Date: Wed, 27 Dec 2023 21:36:01 +0800 Subject: [PATCH 5/5] Cleanup comments Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 27 --------- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 4 -- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 61 +-------------------- 3 files changed, 1 insertion(+), 91 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 2d4b4a49..1fd9cc06 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -136,18 +136,6 @@ void client_data_handler(z_owned_reply_t * reply, void * data) ); return; } - // RCUTILS_LOG_INFO_NAMED( - // "rmw_zenoh_cpp", - // "[client_data_handler] triggered for %s", - // client_data->service_name - // ); - // if (!z_check(*reply)) { - // RCUTILS_LOG_ERROR_NAMED( - // "rmw_zenoh_cpp", - // "z_check returned False" - // ); - // return; - // } if (!z_reply_check(reply)) { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -162,21 +150,8 @@ void client_data_handler(z_owned_reply_t * reply, void * data) ); return; } - - // z_sample_t sample = z_reply_ok(reply); - - // z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); - - // RCUTILS_LOG_DEBUG_NAMED( - // "rmw_zenoh_cpp", - // "[client_data_handler] keyexpr of sample: %s", - // z_loan(keystr) - // ); - { std::lock_guard msg_lock(client_data->message_mutex); - // client_data->message = std::make_unique( - // zc_sample_payload_rcinc(&sample), sample.timestamp.time, sample.timestamp.id.id); // Take ownership of the reply. client_data->replies.emplace_back(*reply); *reply = z_reply_null(); @@ -187,6 +162,4 @@ void client_data_handler(z_owned_reply_t * reply, void * data) client_data->condition->notify_one(); } } - - // z_drop(z_move(keystr)); } diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 05c2793b..4d04cdbd 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -137,7 +137,6 @@ void service_data_handler(const z_query_t * query, void * service_data); void client_data_handler(z_owned_reply_t * reply, void * client_data); - ///============================================================================== struct rmw_service_data_t @@ -172,15 +171,12 @@ struct rmw_service_data_t struct rmw_client_data_t { - // const char * service_name; z_owned_keyexpr_t keyexpr; z_owned_closure_reply_t zn_closure_reply; - // z_owned_reply_channel_t channel; std::mutex message_mutex; std::vector replies; - // std::unique_ptr message; const void * request_type_support_impl; const void * response_type_support_impl; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index ab267eee..4cc1ad71 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1883,7 +1883,6 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) // CLEANUP =================================================================== z_drop(z_move(client_data->zn_closure_reply)); - // z_drop(z_move(client_data->channel)); z_drop(z_move(client_data->keyexpr)); for (z_owned_reply_t & reply : client_data->replies) { z_reply_drop(&reply); @@ -1974,53 +1973,11 @@ rmw_send_request( opts.target = Z_QUERY_TARGET_ALL; opts.value.payload = z_bytes_t{data_length, reinterpret_cast(request_bytes)}; opts.value.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL); - - // z_owned_keyexpr_t keyexpr = ros_topic_name_to_zenoh_key( - // client->service_name, client_data->context->actual_domain_id, allocator); - // auto always_free_ros_keyexpr = rcpputils::make_scope_exit( - // [&keyexpr]() { - // z_keyexpr_drop(z_move(keyexpr)); - // }); - // if (!z_keyexpr_check(&keyexpr)) { - // RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); - // return RMW_RET_ERROR; - // } - - // client_data->service_name = client->service_name; - client_data->zn_closure_reply = z_closure(client_data_handler, nullptr, client_data); - z_get( z_loan(context_impl->session), z_loan( client_data->keyexpr), "", &client_data->zn_closure_reply, &opts); - // z_owned_reply_channel_t channel = zc_reply_non_blocking_fifo_new(16); - // z_get(z_loan(context_impl->session), z_loan(client_data->keyexpr), "", z_move(channel.send), - // &opts); // here, the send is moved and will be dropped by zenoh when adequate - // z_owned_reply_t reply = z_reply_null(); - // for (bool call_success = z_call(channel.recv, &reply); !call_success || z_check(reply); - // call_success = z_call(channel.recv, &reply)) { - // if (!call_success) { - // RCUTILS_LOG_WARN_NAMED( - // "rmw_zenoh_cpp", "[rmw_send_request] call unsuccessful"); - // continue; - // } - // if (z_reply_is_ok(&reply)) { - // client_data->replies.push_back(std::move(reply)); - // // reply = z_reply_null(); - // } else { - // RCUTILS_LOG_WARN_NAMED( - // "rmw_zenoh_cpp", "[rmw_send_request] z_reply is not ok"); - // return RMW_RET_ERROR; - // } - // } - // std::lock_guard internal_lock(client_data->internal_mutex); - // if (client_data->condition != nullptr) { - // client_data->condition->notify_one(); - // } - - // z_drop(z_move(channel)); - // z_reply_drop(&reply); return RMW_RET_OK; } @@ -2061,9 +2018,6 @@ rmw_take_response( return RMW_RET_ERROR; } latest_reply = &client_data->replies.back(); - // msg_data = std::move(client_data->message); - // client_data->message.release(); - z_sample_t sample = z_reply_ok(latest_reply); // Object that manages the raw buffer @@ -2091,7 +2045,6 @@ rmw_take_response( z_reply_drop(&reply); } client_data->replies.clear(); - // zc_payload_drop(&(msg_data->payload)); // TODO(francocipollone): Verify request_header information. request_header->request_id.sequence_number = 0; @@ -2433,16 +2386,10 @@ rmw_take_request( RMW_SET_ERROR_MSG("Query id not found in id_query_map"); return RMW_RET_ERROR; } - // const z_owned_query_t * owned_query_ptr = (*query_it).second.get(); - // TODO(francocipollone): Remove the query id from the to_take collection service_data->to_take.pop_back(); service_data->query_queue_mutex.unlock(); // DESERIALIZE MESSAGE ======================================================== - // if (!z_query_check(owned_query_ptr)) { - // RMW_SET_ERROR_MSG("onwed_query_t contains gravestone, can't deserialize message"); - // return RMW_RET_ERROR; - // } const z_query_t z_loaned_query = z_query_loan(&query_it->second); z_value_t payload_value = z_query_value(&z_loaned_query); @@ -2465,8 +2412,6 @@ rmw_take_request( return RMW_RET_ERROR; } - RCUTILS_LOG_INFO_NAMED("rmw_zenoh_cpp", "[rmw_take_request] deserialized message"); - // Fill in the request header. request_header->request_id.sequence_number = query_id; @@ -2553,13 +2498,10 @@ rmw_send_response( return RMW_RET_ERROR; } const z_query_t z_loaned_query = z_query_loan(&query_it->second); - z_query_reply_options_t options = z_query_reply_options_default(); options.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL); - // const z_query_t loaned_query = z_loan(*owned_query_ptr); - // const z_query_t * query_ptr = &loaned_query; z_query_reply( - &z_loaned_query, z_query_keyexpr(&z_loaned_query), reinterpret_cast( + &z_loaned_query, z_loan(service_data->keyexpr), reinterpret_cast( response_bytes), data_length + meta_length, &options); z_drop(z_move(query_it->second)); @@ -2913,7 +2855,6 @@ rmw_wait( // According to the documentation for rmw_wait in rmw.h, entries in the // array that have *not* been triggered should be set to NULL if (client_data->replies.empty()) { - printf("client replies are empty!!"); // Setting to nullptr lets rcl know that this client is not ready clients->clients[i] = nullptr; }