diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 9e795251..1fd9cc06 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -22,13 +22,13 @@ #include "rmw_data_types.hpp" ///============================================================================== - saved_msg_data::saved_msg_data(zc_owned_payload_t p, uint64_t recv_ts, const uint8_t pub_gid[16]) : payload(p), recv_timestamp(recv_ts) { memcpy(publisher_gid, pub_gid, 16); } +//============================================================================== void sub_data_handler( const z_sample_t * sample, void * data) @@ -79,12 +79,14 @@ void sub_data_handler( } -unsigned int rmw_service_data_t::get_new_uid() +//============================================================================== +std::size_t rmw_service_data_t::get_new_uid() { return client_count++; } -void service_data_handler(const z_query_t * query, void * service_data) +//============================================================================== +void service_data_handler(const z_query_t * query, void * data) { RCUTILS_LOG_INFO_NAMED( "rmw_zenoh_cpp", @@ -92,8 +94,8 @@ void service_data_handler(const z_query_t * query, void * service_data) ); z_owned_str_t keystr = z_keyexpr_to_string(z_query_keyexpr(query)); - auto rmw_service_data = static_cast(service_data); - if (rmw_service_data == nullptr) { + rmw_service_data_t * service_data = static_cast(data); + if (service_data == nullptr) { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to obtain rmw_service_data_t from data for " @@ -106,43 +108,31 @@ void service_data_handler(const z_query_t * query, void * service_data) // Get the query parameters and payload { - std::lock_guard lock(rmw_service_data->query_queue_mutex); - - const unsigned int client_id = rmw_service_data->get_new_uid(); - rmw_service_data->id_query_map.emplace( - std::make_pair(client_id, std::make_unique(z_query_clone(query)))); - rmw_service_data->to_take.push_back(client_id); - - + std::lock_guard lock(service_data->query_queue_mutex); + const std::size_t client_id = service_data->get_new_uid(); + service_data->id_query_map.emplace( + std::make_pair(client_id, z_query_clone(query))); + service_data->to_take.push_back(client_id); + } + { // Since we added new data, trigger the guard condition if it is available - std::lock_guard internal_lock(rmw_service_data->internal_mutex); - if (rmw_service_data->condition != nullptr) { - rmw_service_data->condition->notify_one(); + std::lock_guard internal_lock(service_data->internal_mutex); + if (service_data->condition != nullptr) { + service_data->condition->notify_one(); } } z_drop(z_move(keystr)); } -void client_data_handler(z_owned_reply_t * reply, void * client_data) +//============================================================================== +void client_data_handler(z_owned_reply_t * reply, void * data) { - auto rmw_client_data = static_cast(client_data); - if (rmw_client_data == nullptr) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain rmw_client_data_t " - ); - return; - } - RCUTILS_LOG_INFO_NAMED( - "rmw_zenoh_cpp", - "[client_data_handler] triggered for %s", - rmw_client_data->service_name - ); - if (!z_check(*reply)) { + auto client_data = static_cast(data); + if (client_data == nullptr) { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", - "z_check returned False" + "Unable to obtain client_data_t " ); return; } @@ -160,29 +150,16 @@ void client_data_handler(z_owned_reply_t * reply, void * client_data) ); return; } - - z_sample_t sample = z_reply_ok(reply); - - z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); - - RCUTILS_LOG_DEBUG_NAMED( - "rmw_zenoh_cpp", - "[client_data_handler] keyexpr of sample: %s", - z_loan(keystr) - ); - { - std::lock_guard msg_lock(rmw_client_data->message_mutex); - rmw_client_data->message = std::make_unique( - zc_sample_payload_rcinc(&sample), sample.timestamp.time, sample.timestamp.id.id); + std::lock_guard msg_lock(client_data->message_mutex); + // Take ownership of the reply. + client_data->replies.emplace_back(*reply); + *reply = z_reply_null(); } { - std::lock_guard internal_lock(rmw_client_data->internal_mutex); - if (rmw_client_data->condition != nullptr) { - rmw_client_data->condition->notify_one(); + std::lock_guard internal_lock(client_data->internal_mutex); + if (client_data->condition != nullptr) { + client_data->condition->notify_one(); } } - - z_reply_drop(reply); - z_drop(z_move(keystr)); } diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 0450f69c..4d04cdbd 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -137,14 +137,13 @@ void service_data_handler(const z_query_t * query, void * service_data); void client_data_handler(z_owned_reply_t * reply, void * client_data); - ///============================================================================== struct rmw_service_data_t { - unsigned int get_new_uid(); + std::size_t get_new_uid(); - const char * keyexpr; + z_owned_keyexpr_t keyexpr; z_owned_queryable_t qable; const void * request_type_support_impl; @@ -157,27 +156,27 @@ struct rmw_service_data_t // Map to store the query id and the query. // The query handler is saved as it is needed to answer the query later on. - std::unordered_map> id_query_map; + std::unordered_map id_query_map; // The query id's of the queries that need to be processed. - std::deque to_take; + std::deque to_take; std::mutex query_queue_mutex; std::mutex internal_mutex; std::condition_variable * condition{nullptr}; - unsigned int client_count{}; + std::size_t client_count = 0; }; ///============================================================================== struct rmw_client_data_t { - const char * service_name; + z_owned_keyexpr_t keyexpr; z_owned_closure_reply_t zn_closure_reply; std::mutex message_mutex; - std::unique_ptr message; + std::vector replies; const void * request_type_support_impl; const void * response_type_support_impl; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 65165f0e..4cc1ad71 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1739,6 +1739,13 @@ rmw_create_client( allocator->deallocate(client_data, allocator->state); }); + RMW_TRY_PLACEMENT_NEW(client_data, client_data, return nullptr, rmw_client_data_t); + auto destruct_client_data = rcpputils::make_scope_exit( + [client_data]() { + RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( + client_data->~rmw_client_data_t(), + rmw_client_data_t); + }); // Obtain the type support const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); @@ -1809,43 +1816,39 @@ rmw_create_client( }); // Populate the rmw_client. - rmw_client->data = client_data; rmw_client->implementation_identifier = rmw_zenoh_identifier; - rmw_client->service_name = rcutils_strdup(service_name, *allocator); - RMW_CHECK_FOR_NULL_WITH_MSG( rmw_client->service_name, "failed to allocate client name", return nullptr); - auto free_service_name = rcpputils::make_scope_exit( [rmw_client, allocator]() { allocator->deallocate(const_cast(rmw_client->service_name), allocator->state); }); - // Zenoh implementation for the client - - // TODO(francocipollone): Replace ros_topic_name_to_zenoh_key by service related function. - // If this is enough simply rename the method. - z_owned_keyexpr_t keyexpr = ros_topic_name_to_zenoh_key( + client_data->keyexpr = ros_topic_name_to_zenoh_key( rmw_client->service_name, node->context->actual_domain_id, allocator); - auto always_free_ros_keyexpr = rcpputils::make_scope_exit( - [&keyexpr]() { - z_keyexpr_drop(z_move(keyexpr)); + auto free_ros_keyexpr = rcpputils::make_scope_exit( + [client_data]() { + z_keyexpr_drop(z_move(client_data->keyexpr)); }); - if (!z_keyexpr_check(&keyexpr)) { + if (!z_keyexpr_check(&client_data->keyexpr)) { RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); return nullptr; } + rmw_client->data = client_data; + free_rmw_client.cancel(); free_client_data.cancel(); free_request_type_support.cancel(); destruct_request_type_support.cancel(); free_response_type_support.cancel(); + destruct_client_data.cancel(); destruct_response_type_support.cancel(); free_service_name.cancel(); + free_ros_keyexpr.cancel(); return rmw_client; } @@ -1855,11 +1858,10 @@ rmw_create_client( rmw_ret_t rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) { - RCUTILS_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "[rmw_destroy_client] %s", client->service_name); - // ASSERTIONS ================================================================ RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, node->implementation_identifier, @@ -1873,14 +1875,19 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) rcutils_allocator_t * allocator = &node->context->options.allocator; - auto client_data = static_cast(client->data); + rmw_client_data_t * client_data = static_cast(client->data); RMW_CHECK_FOR_NULL_WITH_MSG( client_data, - "client implementation pointer is null", + "client implementation pointer is null.", return RMW_RET_INVALID_ARGUMENT); // CLEANUP =================================================================== z_drop(z_move(client_data->zn_closure_reply)); + z_drop(z_move(client_data->keyexpr)); + for (z_owned_reply_t & reply : client_data->replies) { + z_reply_drop(&reply); + } + client_data->replies.clear(); allocator->deallocate(client_data->request_type_support, allocator->state); allocator->deallocate(client_data->response_type_support, allocator->state); @@ -1889,8 +1896,6 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) allocator->deallocate(const_cast(client->service_name), allocator->state); allocator->deallocate(client, allocator->state); - RCUTILS_LOG_DEBUG_NAMED( - "rmw_zenoh_cpp", "[rmw_destroy_client] %s FINISHED", client->service_name); return RMW_RET_OK; } @@ -1904,24 +1909,22 @@ rmw_send_request( { RCUTILS_LOG_INFO_NAMED( "rmw_zenoh_cpp", "[rmw_send_request]"); - RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(ros_request, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(sequence_id, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_TYPE_IDENTIFIERS_MATCH( client, client->implementation_identifier, rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + rmw_client_data_t * client_data = static_cast(client->data); RMW_CHECK_FOR_NULL_WITH_MSG( - client->data, - "client implementation pointer is null", + client_data, + "Unable to retrieve client_data from client.", RMW_RET_INVALID_ARGUMENT); - auto * client_data = static_cast(client->data); - rmw_context_impl_s * context_impl = static_cast( client_data->context->impl); @@ -1967,24 +1970,13 @@ rmw_send_request( // Send request z_get_options_t opts = z_get_options_default(); + opts.target = Z_QUERY_TARGET_ALL; opts.value.payload = z_bytes_t{data_length, reinterpret_cast(request_bytes)}; opts.value.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL); - - z_owned_keyexpr_t keyexpr = ros_topic_name_to_zenoh_key( - client->service_name, client_data->context->actual_domain_id, allocator); - auto always_free_ros_keyexpr = rcpputils::make_scope_exit( - [&keyexpr]() { - z_keyexpr_drop(z_move(keyexpr)); - }); - if (!z_keyexpr_check(&keyexpr)) { - RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); - return RMW_RET_ERROR; - } - - client_data->service_name = client->service_name; client_data->zn_closure_reply = z_closure(client_data_handler, nullptr, client_data); - - z_get(z_loan(context_impl->session), z_loan(keyexpr), "", &client_data->zn_closure_reply, &opts); + z_get( + z_loan(context_impl->session), z_loan( + client_data->keyexpr), "", &client_data->zn_closure_reply, &opts); return RMW_RET_OK; } @@ -2002,41 +1994,36 @@ rmw_take_response( RCUTILS_LOG_INFO_NAMED("rmw_zenoh_cpp", "[rmw_take_response]"); RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(ros_response, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_TYPE_IDENTIFIERS_MATCH( client, client->implementation_identifier, rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - RMW_CHECK_FOR_NULL_WITH_MSG( client->service_name, "client has no service name", RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_FOR_NULL_WITH_MSG( - client->data, "client implementation pointer is null", RMW_RET_INVALID_ARGUMENT); - - auto client_data = static_cast(client->data); - - std::unique_ptr msg_data = nullptr; + rmw_client_data_t * client_data = static_cast(client->data); + RMW_CHECK_FOR_NULL_WITH_MSG( + client->data, "Unable to retrieve client_data from client.", RMW_RET_INVALID_ARGUMENT); - { - std::lock_guard lock(client_data->message_mutex); - if (client_data->message == nullptr) { - // TODO(francocipollone): Verify behavior. - RCUTILS_LOG_ERROR_NAMED("rmw_zenoh_cpp", "[rmw_take_response] Response message is empty"); - return RMW_RET_ERROR; - } + z_owned_reply_t * latest_reply = nullptr; - msg_data = std::move(client_data->message); - client_data->message.release(); + std::lock_guard lock(client_data->message_mutex); + if (client_data->replies.empty()) { + // TODO(francocipollone): Verify behavior. + RCUTILS_LOG_ERROR_NAMED("rmw_zenoh_cpp", "[rmw_take_response] Response message is empty"); + return RMW_RET_ERROR; } + latest_reply = &client_data->replies.back(); + z_sample_t sample = z_reply_ok(latest_reply); // Object that manages the raw buffer eprosima::fastcdr::FastBuffer fastbuffer( - reinterpret_cast(const_cast(msg_data->payload.payload.start)), - msg_data->payload.payload.len); + reinterpret_cast(const_cast(sample.payload.start)), + sample.payload.len); // Object that serializes the data eprosima::fastcdr::Cdr deser( @@ -2053,7 +2040,11 @@ rmw_take_response( } *taken = true; - zc_payload_drop(&(msg_data->payload)); + + for (z_owned_reply_t & reply : client_data->replies) { + z_reply_drop(&reply); + } + client_data->replies.clear(); // TODO(francocipollone): Verify request_header information. request_header->request_id.sequence_number = 0; @@ -2158,7 +2149,6 @@ rmw_create_service( rmw_service, "failed to allocate memory for the service", return nullptr); - auto free_rmw_service = rcpputils::make_scope_exit( [rmw_service, allocator]() { allocator->deallocate(rmw_service, allocator->state); @@ -2207,16 +2197,14 @@ rmw_create_service( // Request type support service_data->request_type_support = static_cast( allocator->allocate(sizeof(RequestTypeSupport), allocator->state)); - RMW_CHECK_FOR_NULL_WITH_MSG( service_data->request_type_support, "Failed to allocate RequestTypeSupport", return nullptr); auto free_request_type_support = rcpputils::make_scope_exit( - [service_data, allocator]() { - allocator->deallocate(service_data->request_type_support, allocator->state); + [request_type_support = service_data->request_type_support, allocator]() { + allocator->deallocate(request_type_support, allocator->state); }); - RMW_TRY_PLACEMENT_NEW( service_data->request_type_support, service_data->request_type_support, @@ -2232,16 +2220,14 @@ rmw_create_service( // Response type support service_data->response_type_support = static_cast( allocator->allocate(sizeof(ResponseTypeSupport), allocator->state)); - RMW_CHECK_FOR_NULL_WITH_MSG( service_data->response_type_support, "Failed to allocate ResponseTypeSupport", return nullptr); auto free_response_type_support = rcpputils::make_scope_exit( - [service_data, allocator]() { - allocator->deallocate(service_data->response_type_support, allocator->state); + [response_type_support = service_data->response_type_support, allocator]() { + allocator->deallocate(response_type_support, allocator->state); }); - RMW_TRY_PLACEMENT_NEW( service_data->response_type_support, service_data->response_type_support, @@ -2255,46 +2241,34 @@ rmw_create_service( }); // Populate the rmw_service. - rmw_service->data = service_data; rmw_service->implementation_identifier = rmw_zenoh_identifier; - rmw_service->service_name = rcutils_strdup(service_name, *allocator); - RMW_CHECK_FOR_NULL_WITH_MSG( rmw_service->service_name, "failed to allocate service name", return nullptr); - auto free_service_name = rcpputils::make_scope_exit( [rmw_service, allocator]() { allocator->deallocate(const_cast(rmw_service->service_name), allocator->state); }); - - // Zenoh implementation for the service - - // TODO(francocipollone): Replace ros_topic_name_to_zenoh_key by service related function. - // If this is enough simply rename the method. - z_owned_keyexpr_t keyexpr = ros_topic_name_to_zenoh_key( + service_data->keyexpr = ros_topic_name_to_zenoh_key( rmw_service->service_name, node->context->actual_domain_id, allocator); - auto always_free_ros_keyexpr = rcpputils::make_scope_exit( - [&keyexpr]() { - z_keyexpr_drop(z_move(keyexpr)); + auto free_ros_keyexpr = rcpputils::make_scope_exit( + [service_data]() { + if (service_data) { + z_drop(z_move(service_data->keyexpr)); + } }); - if (!z_keyexpr_check(&keyexpr)) { + if (!z_check(z_loan(service_data->keyexpr))) { RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); return nullptr; } - service_data->keyexpr = z_keyexpr_to_string(z_loan(keyexpr))._cstr; - - RCUTILS_LOG_INFO_NAMED( - "rmw_zenoh_cpp", "[rmw_create_service] keyexpr: %s", - z_loan(z_keyexpr_to_string(z_loan(keyexpr)))); z_owned_closure_query_t callback = z_closure(service_data_handler, nullptr, service_data); service_data->qable = z_declare_queryable( z_loan(context_impl->session), - z_loan(keyexpr), + z_loan(service_data->keyexpr), z_move(callback), nullptr); @@ -2302,7 +2276,6 @@ rmw_create_service( RMW_SET_ERROR_MSG("unable to create zenoh queryable"); return nullptr; } - auto undeclare_z_queryable = rcpputils::make_scope_exit( [service_data]() { z_undeclare_queryable(z_move(service_data->qable)); @@ -2310,6 +2283,8 @@ rmw_create_service( // TODO(francocipollone): Update graph cache. + rmw_service->data = service_data; + free_rmw_service.cancel(); free_service_data.cancel(); free_service_name.cancel(); @@ -2318,6 +2293,7 @@ rmw_create_service( destruct_response_type_support.cancel(); free_request_type_support.cancel(); free_response_type_support.cancel(); + free_ros_keyexpr.cancel(); undeclare_z_queryable.cancel(); return rmw_service; } @@ -2330,6 +2306,7 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) // ASSERTIONS ================================================================ RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(service->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, node->implementation_identifier, @@ -2343,13 +2320,14 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) rcutils_allocator_t * allocator = &node->context->options.allocator; - auto service_data = static_cast(service->data); + rmw_service_data_t * service_data = static_cast(service->data); RMW_CHECK_FOR_NULL_WITH_MSG( service_data, - "service implementation pointer is null", + "Unable to retrieve service_data from service", return RMW_RET_INVALID_ARGUMENT); // CLEANUP ================================================================ + z_drop(z_move(service_data->keyexpr)); z_drop(z_move(service_data->qable)); for (auto & id_query : service_data->id_query_map) { z_drop(z_move(id_query.second)); @@ -2359,7 +2337,8 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) allocator->deallocate(service_data->response_type_support, allocator->state); allocator->deallocate(service->data, allocator->state); - rmw_service_free(service); + allocator->deallocate(const_cast(service->service_name), allocator->state); + allocator->deallocate(service, allocator->state); // TODO(francocipollone): Update graph cache. return RMW_RET_OK; @@ -2378,6 +2357,7 @@ rmw_take_request( RCUTILS_LOG_INFO_NAMED("rmw_zenoh_cpp", "[rmw_take_request]"); RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(service->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(ros_request, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); @@ -2389,36 +2369,28 @@ rmw_take_request( RMW_CHECK_FOR_NULL_WITH_MSG( service->service_name, "service has no service name", RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_FOR_NULL_WITH_MSG( - service->data, "service implementation pointer is null", RMW_RET_INVALID_ARGUMENT); - auto * service_data = static_cast(service->data); + rmw_service_data_t * service_data = static_cast(service->data); + RMW_CHECK_FOR_NULL_WITH_MSG( + service->data, "Unable to retrieve service_data from service", RMW_RET_INVALID_ARGUMENT); std::unique_lock lock(service_data->query_queue_mutex); - if (service_data->id_query_map.empty()) { // TODO(francocipollone): Verify behavior. RCUTILS_LOG_INFO_NAMED("rmw_zenoh_cpp", "[rmw_take_request] Take id_query_map is empty"); return RMW_RET_OK; } - - const unsigned int query_id = service_data->to_take.back(); - const auto query_ret = service_data->id_query_map.find(query_id); - if (query_ret == service_data->id_query_map.end()) { + const std::size_t query_id = service_data->to_take.back(); + auto query_it = service_data->id_query_map.find(query_id); + if (query_it == service_data->id_query_map.end()) { RMW_SET_ERROR_MSG("Query id not found in id_query_map"); return RMW_RET_ERROR; } - const z_owned_query_t * owned_query_ptr = (*query_ret).second.get(); - // TODO(francocipollone): Remove the query id from the to_take collection service_data->to_take.pop_back(); service_data->query_queue_mutex.unlock(); // DESERIALIZE MESSAGE ======================================================== - if (!z_query_check(owned_query_ptr)) { - RMW_SET_ERROR_MSG("onwed_query_t contains gravestone, can't deserialize message"); - return RMW_RET_ERROR; - } - const z_query_t z_loaned_query = z_query_loan(owned_query_ptr); + const z_query_t z_loaned_query = z_query_loan(&query_it->second); z_value_t payload_value = z_query_value(&z_loaned_query); // Object that manages the raw buffer @@ -2440,8 +2412,6 @@ rmw_take_request( return RMW_RET_ERROR; } - RCUTILS_LOG_INFO_NAMED("rmw_zenoh_cpp", "[rmw_take_request] deserialized message"); - // Fill in the request header. request_header->request_id.sequence_number = query_id; @@ -2462,6 +2432,7 @@ rmw_send_response( "rmw_zenoh_cpp", "[rmw_send_response]"); RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(service->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(request_header, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(ros_response, RMW_RET_INVALID_ARGUMENT); @@ -2473,10 +2444,10 @@ rmw_send_response( RMW_CHECK_FOR_NULL_WITH_MSG( service->data, - "service implementation pointer is null", + "Unable to retrieve service_data from service", RMW_RET_INVALID_ARGUMENT); - auto * service_data = static_cast(service->data); + rmw_service_data_t * service_data = static_cast(service->data); rcutils_allocator_t * allocator = &(service_data->context->options.allocator); @@ -2520,19 +2491,21 @@ rmw_send_response( meta_length); // Create the queryable payload - service_data->query_queue_mutex.lock(); - auto owned_query_ptr = service_data->id_query_map[request_header->sequence_number].get(); - service_data->query_queue_mutex.unlock(); - + std::lock_guard lock(service_data->query_queue_mutex); + auto query_it = service_data->id_query_map.find(request_header->sequence_number); + if (query_it == service_data->id_query_map.end()) { + RMW_SET_ERROR_MSG("Unable to find taken request. Report this bug."); + return RMW_RET_ERROR; + } + const z_query_t z_loaned_query = z_query_loan(&query_it->second); z_query_reply_options_t options = z_query_reply_options_default(); options.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL); - const z_query_t loaned_query = z_loan(*owned_query_ptr); - const z_query_t * query_ptr = &loaned_query; z_query_reply( - query_ptr, z_query_keyexpr(query_ptr), reinterpret_cast( + &z_loaned_query, z_loan(service_data->keyexpr), reinterpret_cast( response_bytes), data_length + meta_length, &options); - z_drop(z_move(*owned_query_ptr)); + z_drop(z_move(query_it->second)); + service_data->id_query_map.erase(query_it); return RMW_RET_OK; } @@ -2807,7 +2780,7 @@ rmw_wait( // Go through each of the clients and attach the wait set condition variable to them. // That way they can wake it up if they are triggered while we are waiting. for (size_t i = 0; i < clients->client_count; ++i) { - auto client_data = static_cast(clients->clients[i]); + rmw_client_data_t * client_data = static_cast(clients->clients[i]); if (client_data != nullptr) { client_data->condition = &wait_set_data->condition_variable; } @@ -2876,12 +2849,12 @@ rmw_wait( if (clients) { // Now detach the condition variable and mutex from each of the clients for (size_t i = 0; i < clients->client_count; ++i) { - auto client_data = static_cast(clients->clients[i]); + rmw_client_data_t * client_data = static_cast(clients->clients[i]); if (client_data != nullptr) { client_data->condition = nullptr; // According to the documentation for rmw_wait in rmw.h, entries in the // array that have *not* been triggered should be set to NULL - if (client_data->message == nullptr) { + if (client_data->replies.empty()) { // Setting to nullptr lets rcl know that this client is not ready clients->clients[i] = nullptr; }