diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 57d6b3da..d26c9e0d 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -81,13 +81,6 @@ void sub_data_handler( } } - -//============================================================================== -std::size_t rmw_service_data_t::get_new_uid() -{ - return client_count++; -} - //============================================================================== void service_data_handler(const z_query_t * query, void * data) { @@ -115,10 +108,7 @@ void service_data_handler(const z_query_t * query, void * data) // Get the query parameters and payload { std::lock_guard lock(service_data->query_queue_mutex); - const std::size_t client_id = service_data->get_new_uid(); - service_data->id_query_map.emplace( - std::make_pair(client_id, z_query_clone(query))); - service_data->to_take.push_back(client_id); + service_data->query_queue.push_back(z_query_clone(query)); } { // Since we added new data, trigger the guard condition if it is available diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 40c5a28e..de45faba 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -133,7 +133,6 @@ struct rmw_subscription_data_t ///============================================================================== -// z_owned_closure_query_t void service_data_handler(const z_query_t * query, void * service_data); void client_data_handler(z_owned_reply_t * reply, void * client_data); @@ -158,17 +157,16 @@ struct rmw_service_data_t rmw_context_t * context; - // Map to store the query id and the query. - // The query handler is saved as it is needed to answer the query later on. - std::unordered_map id_query_map; - // The query id's of the queries that need to be processed. - std::deque to_take; + // Deque to store the queries in the order they arrive. + std::deque query_queue; std::mutex query_queue_mutex; + // Map to store the sequence_number -> query_id + std::map sequence_to_query_map; + std::mutex sequence_to_query_map_mutex; + std::mutex internal_mutex; std::condition_variable * condition{nullptr}; - - std::size_t client_count = 0; }; ///============================================================================== @@ -182,7 +180,7 @@ struct rmw_client_data_t zc_owned_liveliness_token_t token; std::mutex message_mutex; - std::vector replies; + std::deque replies; const void * request_type_support_impl; const void * response_type_support_impl; @@ -194,6 +192,8 @@ struct rmw_client_data_t std::mutex internal_mutex; std::condition_variable * condition{nullptr}; + + size_t sequence_number{1}; }; #endif // DETAIL__RMW_DATA_TYPES_HPP_ diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 45148d97..44c5748a 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -18,6 +18,7 @@ #include #include +#include #include #include #include @@ -1486,7 +1487,7 @@ static rmw_ret_t __rmw_take( std::unique_ptr msg_data; { - std::unique_lock lock(sub_data->message_queue_mutex); + std::lock_guard lock(sub_data->message_queue_mutex); if (sub_data->message_queue.empty()) { // This tells rcl that the check for a new message was done, but no messages have come in yet. @@ -1957,6 +1958,26 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) return RMW_RET_OK; } +static z_owned_bytes_map_t create_map_and_set_sequence_num(int64_t sequence_number) +{ + z_owned_bytes_map_t map = z_bytes_map_new(); + if (!z_check(map)) { + RMW_SET_ERROR_MSG("failed to allocate map for sequence number"); + return z_bytes_map_null(); + } + + // The largest possible int64_t number is INT64_MAX, i.e. 9223372036854775807. + // That is 19 characters long, plus one for the trailing \0, means we need 20 bytes. + char seq_id_str[20]; + if (rcutils_snprintf(seq_id_str, 20, "%" PRId64, sequence_number) < 0) { + RMW_SET_ERROR_MSG("failed to print sequence_number into buffer"); + return z_bytes_map_null(); + } + z_bytes_map_insert_by_copy(&map, z_bytes_new("sequence_number"), z_bytes_new(seq_id_str)); + + return map; +} + //============================================================================== /// Send a ROS service request. rmw_ret_t @@ -2025,11 +2046,24 @@ rmw_send_request( size_t data_length = ser.getSerializedDataLength(); - // TODO(francocipollone): Do I really need the sequency number here? - *sequence_id = 0; + // TODO(clalancette): Locking for multiple requests at the same time + *sequence_id = client_data->sequence_number++; // Send request z_get_options_t opts = z_get_options_default(); + + z_owned_bytes_map_t map = create_map_and_set_sequence_num(*sequence_id); + if (!z_check(map)) { + // create_map_and_set_sequence_num already set the error + return RMW_RET_ERROR; + } + auto free_attachment_map = rcpputils::make_scope_exit( + [&map]() { + z_bytes_map_drop(z_move(map)); + }); + + opts.attachment = z_bytes_map_as_attachment(&map); + opts.target = Z_QUERY_TARGET_ALL; opts.value.payload = z_bytes_t{data_length, reinterpret_cast(request_bytes)}; opts.value.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL); @@ -2041,6 +2075,65 @@ rmw_send_request( return RMW_RET_OK; } +static int64_t get_sequence_num_from_attachment(const z_attachment_t * const attachment) +{ + // Get the sequence_number out of the attachment + if (!z_check(*attachment)) { + // A valid request must have had an attachment + RMW_SET_ERROR_MSG("Could not get attachment from query"); + return -1; + } + + z_bytes_t sequence_num_index = z_attachment_get(*attachment, z_bytes_new("sequence_number")); + if (!z_check(sequence_num_index)) { + // A valid request must have had a sequence number attached + RMW_SET_ERROR_MSG("Could not get sequence number from query"); + return -1; + } + + if (sequence_num_index.len < 1) { + RMW_SET_ERROR_MSG("No value specified for the sequence number"); + return -1; + } + + if (sequence_num_index.len > 19) { + // The sequence number was larger than we expected + RMW_SET_ERROR_MSG("Sequence number too large"); + return -1; + } + + // The largest possible int64_t number is INT64_MAX, i.e. 9223372036854775807. + // That is 19 characters long, plus one for the trailing \0, means we need 20 bytes. + char sequence_num_str[20]; + + memcpy(sequence_num_str, sequence_num_index.start, sequence_num_index.len); + sequence_num_str[sequence_num_index.len] = '\0'; + + errno = 0; + char * endptr; + int64_t seqnum = strtol(sequence_num_str, &endptr, 10); + if (seqnum == 0) { + // This is an error regardless; the client should never send this + RMW_SET_ERROR_MSG("A invalid zero value sent as the sequence number"); + return -1; + } else if (endptr == sequence_num_str) { + // No values were converted, this is an error + RMW_SET_ERROR_MSG("No valid numbers available in the sequence number"); + return -1; + } else if (*endptr != '\0') { + // There was junk after the number + RMW_SET_ERROR_MSG("Non-numeric values in the sequence number"); + return -1; + } else if (errno != 0) { + // Some other error occurred, which may include overflow or underflow + RMW_SET_ERROR_MSG( + "An undefined error occurred while getting the sequence number, this may be an overflow"); + return -1; + } + + return seqnum; +} + //============================================================================== /// Take an incoming ROS service response. rmw_ret_t @@ -2073,11 +2166,10 @@ rmw_take_response( std::lock_guard lock(client_data->message_mutex); if (client_data->replies.empty()) { - // TODO(francocipollone): Verify behavior. RCUTILS_LOG_ERROR_NAMED("rmw_zenoh_cpp", "[rmw_take_response] Response message is empty"); return RMW_RET_ERROR; } - latest_reply = &client_data->replies.back(); + latest_reply = &client_data->replies.front(); z_sample_t sample = z_reply_ok(latest_reply); // Object that manages the raw buffer @@ -2099,15 +2191,18 @@ rmw_take_response( return RMW_RET_ERROR; } - *taken = true; - - for (z_owned_reply_t & reply : client_data->replies) { - z_reply_drop(&reply); + request_header->request_id.sequence_number = get_sequence_num_from_attachment(&sample.attachment); + if (request_header->request_id.sequence_number < 0) { + // get_sequence_num_from_attachment already set an error + return RMW_RET_ERROR; } - client_data->replies.clear(); + // TODO(clalancette): We also need to fill in the source_timestamp, received_timestamp, + // and writer_guid - // TODO(francocipollone): Verify request_header information. - request_header->request_id.sequence_number = 0; + *taken = true; + + client_data->replies.pop_front(); + z_reply_drop(latest_reply); return RMW_RET_OK; } @@ -2432,9 +2527,11 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) // CLEANUP ================================================================ z_drop(z_move(service_data->keyexpr)); z_drop(z_move(service_data->qable)); - for (auto & id_query : service_data->id_query_map) { - z_drop(z_move(id_query.second)); + for (z_owned_query_t & query : service_data->query_queue) { + z_drop(z_move(query)); } + service_data->query_queue.clear(); + service_data->sequence_to_query_map.clear(); z_drop(z_move(service_data->token)); allocator->deallocate(service_data->request_type_support, allocator->state); @@ -2477,24 +2574,40 @@ rmw_take_request( RMW_CHECK_FOR_NULL_WITH_MSG( service->data, "Unable to retrieve service_data from service", RMW_RET_INVALID_ARGUMENT); - std::unique_lock lock(service_data->query_queue_mutex); - if (service_data->id_query_map.empty()) { - // TODO(francocipollone): Verify behavior. - RCUTILS_LOG_INFO_NAMED("rmw_zenoh_cpp", "[rmw_take_request] Take id_query_map is empty"); - return RMW_RET_OK; + z_owned_query_t query; + { + std::lock_guard lock(service_data->query_queue_mutex); + if (service_data->query_queue.empty()) { + return RMW_RET_OK; + } + query = service_data->query_queue.front(); } - const std::size_t query_id = service_data->to_take.back(); - auto query_it = service_data->id_query_map.find(query_id); - if (query_it == service_data->id_query_map.end()) { - RMW_SET_ERROR_MSG("Query id not found in id_query_map"); + + const z_query_t loaned_query = z_query_loan(&query); + + // Get the sequence_number out of the attachment + z_attachment_t attachment = z_query_attachment(&loaned_query); + + int64_t sequence_number = get_sequence_num_from_attachment(&attachment); + if (sequence_number < 0) { + // get_sequence_number_from_attachment already set the error return RMW_RET_ERROR; } - service_data->to_take.pop_back(); - service_data->query_queue_mutex.unlock(); + + // Add this query to the map, so that rmw_send_response can quickly look it up later + { + std::lock_guard lock(service_data->sequence_to_query_map_mutex); + if (service_data->sequence_to_query_map.find(sequence_number) != + service_data->sequence_to_query_map.end()) + { + RMW_SET_ERROR_MSG("duplicate sequence number in the map"); + return RMW_RET_ERROR; + } + service_data->sequence_to_query_map.emplace(std::pair(sequence_number, query)); + } // DESERIALIZE MESSAGE ======================================================== - const z_query_t z_loaned_query = z_query_loan(&query_it->second); - z_value_t payload_value = z_query_value(&z_loaned_query); + z_value_t payload_value = z_query_value(&loaned_query); // Object that manages the raw buffer eprosima::fastcdr::FastBuffer fastbuffer( @@ -2516,7 +2629,11 @@ rmw_take_request( } // Fill in the request header. - request_header->request_id.sequence_number = query_id; + // TODO(clalancette): We also need to fill in writer_guid, source_timestamp, + // and received_timestamp + request_header->request_id.sequence_number = sequence_number; + + service_data->query_queue.pop_front(); *taken = true; @@ -2589,28 +2706,36 @@ rmw_send_response( size_t data_length = ser.getSerializedDataLength(); - size_t meta_length = sizeof(request_header->sequence_number); - memcpy( - &response_bytes[data_length], - reinterpret_cast(&request_header->sequence_number), - meta_length); - // Create the queryable payload - std::lock_guard lock(service_data->query_queue_mutex); - auto query_it = service_data->id_query_map.find(request_header->sequence_number); - if (query_it == service_data->id_query_map.end()) { + std::lock_guard lock(service_data->sequence_to_query_map_mutex); + auto query_it = service_data->sequence_to_query_map.find(request_header->sequence_number); + if (query_it == service_data->sequence_to_query_map.end()) { RMW_SET_ERROR_MSG("Unable to find taken request. Report this bug."); return RMW_RET_ERROR; } - const z_query_t z_loaned_query = z_query_loan(&query_it->second); + const z_query_t loaned_query = z_query_loan(&query_it->second); z_query_reply_options_t options = z_query_reply_options_default(); + + // TODO(clalancette): We also need to fill in and send the writer_guid + z_owned_bytes_map_t map = create_map_and_set_sequence_num(request_header->sequence_number); + if (!z_check(map)) { + // create_map_and_set_sequence_num already set the error + return RMW_RET_ERROR; + } + auto free_attachment_map = rcpputils::make_scope_exit( + [&map]() { + z_bytes_map_drop(z_move(map)); + }); + + options.attachment = z_bytes_map_as_attachment(&map); + options.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL); z_query_reply( - &z_loaned_query, z_loan(service_data->keyexpr), reinterpret_cast( - response_bytes), data_length + meta_length, &options); + &loaned_query, z_loan(service_data->keyexpr), reinterpret_cast( + response_bytes), data_length, &options); z_drop(z_move(query_it->second)); - service_data->id_query_map.erase(query_it); + service_data->sequence_to_query_map.erase(query_it); return RMW_RET_OK; } @@ -2943,7 +3068,7 @@ rmw_wait( auto serv_data = static_cast(services->services[i]); if (serv_data != nullptr) { serv_data->condition = nullptr; - if (serv_data->to_take.empty()) { + if (serv_data->query_queue.empty()) { // Setting to nullptr lets rcl know that this service is not ready services->services[i] = nullptr; }