From 6effcc987875ecafab52a14d43f186d2afc4da23 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Fri, 12 Jan 2024 16:14:12 +0000 Subject: [PATCH 01/18] Switch to unique_ptr for the query_queue. This is not strictly required, but it does make ownership clearer. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 2 +- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 4 +- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 43 ++++++++++----------- 3 files changed, 24 insertions(+), 25 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index d26c9e0d..58621bb6 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -108,7 +108,7 @@ void service_data_handler(const z_query_t * query, void * data) // Get the query parameters and payload { std::lock_guard lock(service_data->query_queue_mutex); - service_data->query_queue.push_back(z_query_clone(query)); + service_data->query_queue.push_back(std::make_unique(z_query_clone(query))); } { // Since we added new data, trigger the guard condition if it is available diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index de45faba..fda4873a 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -158,11 +158,11 @@ struct rmw_service_data_t rmw_context_t * context; // Deque to store the queries in the order they arrive. - std::deque query_queue; + std::deque> query_queue; std::mutex query_queue_mutex; // Map to store the sequence_number -> query_id - std::map sequence_to_query_map; + std::unordered_map> sequence_to_query_map; std::mutex sequence_to_query_map_mutex; std::mutex internal_mutex; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 5653fae6..0b4a50d3 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1969,7 +1969,7 @@ static z_owned_bytes_map_t create_map_and_set_sequence_num(int64_t sequence_numb // The largest possible int64_t number is INT64_MAX, i.e. 9223372036854775807. // That is 19 characters long, plus one for the trailing \0, means we need 20 bytes. char seq_id_str[20]; - if (rcutils_snprintf(seq_id_str, 20, "%" PRId64, sequence_number) < 0) { + if (rcutils_snprintf(seq_id_str, sizeof(seq_id_str), "%" PRId64, sequence_number) < 0) { RMW_SET_ERROR_MSG("failed to print sequence_number into buffer"); return z_bytes_map_null(); } @@ -2204,8 +2204,8 @@ rmw_take_response( *taken = true; - client_data->replies.pop_front(); z_reply_drop(latest_reply); + client_data->replies.pop_front(); return RMW_RET_OK; } @@ -2532,8 +2532,8 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) // CLEANUP ================================================================ z_drop(z_move(service_data->keyexpr)); z_drop(z_move(service_data->qable)); - for (z_owned_query_t & query : service_data->query_queue) { - z_drop(z_move(query)); + for (std::unique_ptr & query : service_data->query_queue) { + z_drop(z_move(*query.get())); } z_drop(z_move(service_data->token)); @@ -2577,16 +2577,16 @@ rmw_take_request( RMW_CHECK_FOR_NULL_WITH_MSG( service->data, "Unable to retrieve service_data from service", RMW_RET_INVALID_ARGUMENT); - z_owned_query_t query; + std::unique_ptr query; { std::lock_guard lock(service_data->query_queue_mutex); if (service_data->query_queue.empty()) { return RMW_RET_OK; } - query = service_data->query_queue.front(); + query = std::move(service_data->query_queue.front()); } - const z_query_t loaned_query = z_query_loan(&query); + const z_query_t loaned_query = z_query_loan(query.get()); // Get the sequence_number out of the attachment z_attachment_t attachment = z_query_attachment(&loaned_query); @@ -2597,18 +2597,6 @@ rmw_take_request( return RMW_RET_ERROR; } - // Add this query to the map, so that rmw_send_response can quickly look it up later - { - std::lock_guard lock(service_data->sequence_to_query_map_mutex); - if (service_data->sequence_to_query_map.find(sequence_number) != - service_data->sequence_to_query_map.end()) - { - RMW_SET_ERROR_MSG("duplicate sequence number in the map"); - return RMW_RET_ERROR; - } - service_data->sequence_to_query_map.emplace(std::pair(sequence_number, query)); - } - // DESERIALIZE MESSAGE ======================================================== z_value_t payload_value = z_query_value(&loaned_query); @@ -2631,6 +2619,18 @@ rmw_take_request( return RMW_RET_ERROR; } + // Add this query to the map, so that rmw_send_response can quickly look it up later + { + std::lock_guard lock(service_data->sequence_to_query_map_mutex); + if (service_data->sequence_to_query_map.find(sequence_number) != + service_data->sequence_to_query_map.end()) + { + RMW_SET_ERROR_MSG("duplicate sequence number in the map"); + return RMW_RET_ERROR; + } + service_data->sequence_to_query_map.emplace(std::pair(sequence_number, std::move(query))); + } + // Fill in the request header. // TODO(clalancette): We also need to fill in writer_guid, source_timestamp, // and received_timestamp @@ -2716,7 +2716,7 @@ rmw_send_response( RMW_SET_ERROR_MSG("Unable to find taken request. Report this bug."); return RMW_RET_ERROR; } - const z_query_t loaned_query = z_query_loan(&query_it->second); + const z_query_t loaned_query = z_query_loan(query_it->second.get()); z_query_reply_options_t options = z_query_reply_options_default(); // TODO(clalancette): We also need to fill in and send the writer_guid @@ -2737,7 +2737,7 @@ rmw_send_response( &loaned_query, z_loan(service_data->keyexpr), reinterpret_cast( response_bytes), data_length, &options); - z_drop(z_move(query_it->second)); + z_drop(z_move(*query_it->second.get())); service_data->sequence_to_query_map.erase(query_it); return RMW_RET_OK; } @@ -2997,7 +2997,6 @@ rmw_wait( } } - if (services) { // Go through each of the services and attach the wait set condition variable to them. // That way they can wake it up if they are triggered while we are waiting. From e701b4462f1760ab96a097b6496fe6f236a48de4 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Thu, 18 Jan 2024 19:49:08 +0000 Subject: [PATCH 02/18] Fix cpplint issues. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 8 ++++---- rmw_zenoh_cpp/src/detail/graph_cache.hpp | 1 - rmw_zenoh_cpp/src/rmw_zenoh.cpp | 5 +++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index b34e68d4..4e3b53b2 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -111,8 +111,8 @@ void GraphCache::parse_put(const std::string & keyexpr) return graph_node.clients_; } }(entity, graph_node, entity_desc); - // For the sake of reusing data structures and lookup functions, we treat publishers and clients are equivalent. - // Similarly, subscriptions and services are equivalent. + // For the sake of reusing data structures and lookup functions, we treat publishers and + // clients are equivalent. Similarly, subscriptions and services are equivalent. const std::size_t pub_count = entity.type() == EntityType::Publisher || entity.type() == EntityType::Client ? 1 : 0; const std::size_t sub_count = !pub_count; @@ -369,8 +369,8 @@ void GraphCache::parse_del(const std::string & keyexpr) return graph_node.clients_; } }(entity, graph_node, entity_desc); - // For the sake of reusing data structures and lookup functions, we treat publishers and clients are equivalent. - // Similarly, subscriptions and services are equivalent. + // For the sake of reusing data structures and lookup functions, we treat publishers and + // clients are equivalent. Similarly, subscriptions and services are equivalent. const std::size_t pub_count = entity.type() == EntityType::Publisher || entity.type() == EntityType::Client ? 1 : 0; const std::size_t sub_count = !pub_count; diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index adeb3e01..56357661 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -82,7 +82,6 @@ struct GraphNode // Entires for service/client. TopicMap clients_ = {}; TopicMap services_ = {}; - }; using GraphNodePtr = std::shared_ptr; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 0b4a50d3..f36a2b7b 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -2065,8 +2065,9 @@ rmw_send_request( opts.attachment = z_bytes_map_as_attachment(&map); opts.target = Z_QUERY_TARGET_ALL_COMPLETE; - // Latest consolidation guarantees unicity of replies for the same key expression. It optimizes bandwidth. - // Default is None which imples replies may come in any order and any number. + // Latest consolidation guarantees unicity of replies for the same key expression, + // which optimizes bandwidth. The default is "None", which imples replies may come in any order + // and any number. opts.consolidation = z_query_consolidation_latest(); opts.value.payload = z_bytes_t{data_length, reinterpret_cast(request_bytes)}; opts.value.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL); From eb4a78fd0b2e62231aed6baae41b4f3771c2fbc5 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Fri, 12 Jan 2024 16:44:09 +0000 Subject: [PATCH 03/18] Use unique_ptr for replies. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 4 ++-- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 5 +++-- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 21 ++++++++++++--------- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 58621bb6..929dbec5 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -145,9 +145,9 @@ void client_data_handler(z_owned_reply_t * reply, void * data) return; } { - std::lock_guard msg_lock(client_data->message_mutex); + std::lock_guard msg_lock(client_data->replies_mutex); // Take ownership of the reply. - client_data->replies.emplace_back(*reply); + client_data->replies.emplace_back(std::make_unique(*reply)); *reply = z_reply_null(); } { diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index fda4873a..b724fc2c 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -179,8 +179,8 @@ struct rmw_client_data_t // Liveliness token for the client. zc_owned_liveliness_token_t token; - std::mutex message_mutex; - std::deque replies; + std::mutex replies_mutex; + std::deque> replies; const void * request_type_support_impl; const void * response_type_support_impl; @@ -193,6 +193,7 @@ struct rmw_client_data_t std::mutex internal_mutex; std::condition_variable * condition{nullptr}; + std::mutex sequence_number_mutex; size_t sequence_number{1}; }; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index f36a2b7b..32a29903 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1942,8 +1942,8 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) // CLEANUP =================================================================== z_drop(z_move(client_data->zn_closure_reply)); z_drop(z_move(client_data->keyexpr)); - for (z_owned_reply_t & reply : client_data->replies) { - z_reply_drop(&reply); + for (std::unique_ptr & reply : client_data->replies) { + z_reply_drop(reply.get()); } client_data->replies.clear(); z_drop(z_move(client_data->token)); @@ -2046,8 +2046,10 @@ rmw_send_request( size_t data_length = ser.getSerializedDataLength(); - // TODO(clalancette): Locking for multiple requests at the same time - *sequence_id = client_data->sequence_number++; + { + std::lock_guard lock(client_data->sequence_number_mutex); + *sequence_id = client_data->sequence_number++; + } // Send request z_get_options_t opts = z_get_options_default(); @@ -2166,15 +2168,15 @@ rmw_take_response( RMW_CHECK_FOR_NULL_WITH_MSG( client->data, "Unable to retrieve client_data from client.", RMW_RET_INVALID_ARGUMENT); - z_owned_reply_t * latest_reply = nullptr; + std::unique_ptr latest_reply = nullptr; - std::lock_guard lock(client_data->message_mutex); + std::lock_guard lock(client_data->replies_mutex); if (client_data->replies.empty()) { RCUTILS_LOG_ERROR_NAMED("rmw_zenoh_cpp", "[rmw_take_response] Response message is empty"); return RMW_RET_ERROR; } - latest_reply = &client_data->replies.front(); - z_sample_t sample = z_reply_ok(latest_reply); + latest_reply = std::move(client_data->replies.front()); + z_sample_t sample = z_reply_ok(latest_reply.get()); // Object that manages the raw buffer eprosima::fastcdr::FastBuffer fastbuffer( @@ -2205,7 +2207,7 @@ rmw_take_response( *taken = true; - z_reply_drop(latest_reply); + z_reply_drop(latest_reply.get()); client_data->replies.pop_front(); return RMW_RET_OK; @@ -2536,6 +2538,7 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) for (std::unique_ptr & query : service_data->query_queue) { z_drop(z_move(*query.get())); } + service_data->query_queue.clear(); z_drop(z_move(service_data->token)); allocator->deallocate(service_data->request_type_support, allocator->state); From 257ddf1a8d02f7555a8c5c7916361883540a414a Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Fri, 12 Jan 2024 18:19:46 +0000 Subject: [PATCH 04/18] Use custom classes to wrap the queries and replies. This allows us to do RAII and drop things in all paths properly. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 40 ++++++++++++++++++- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 35 ++++++++++++++--- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 43 ++++++++------------- 3 files changed, 84 insertions(+), 34 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 929dbec5..99241ca5 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -81,6 +81,21 @@ void sub_data_handler( } } +ZenohQuery::ZenohQuery(const z_query_t * query) +{ + query_ = z_query_clone(query); +} + +ZenohQuery::~ZenohQuery() +{ + z_drop(z_move(query_)); +} + +const z_query_t ZenohQuery::get_query() const +{ + return z_query_loan(&query_); +} + //============================================================================== void service_data_handler(const z_query_t * query, void * data) { @@ -108,7 +123,7 @@ void service_data_handler(const z_query_t * query, void * data) // Get the query parameters and payload { std::lock_guard lock(service_data->query_queue_mutex); - service_data->query_queue.push_back(std::make_unique(z_query_clone(query))); + service_data->query_queue.emplace_back(std::make_unique(query)); } { // Since we added new data, trigger the guard condition if it is available @@ -119,6 +134,27 @@ void service_data_handler(const z_query_t * query, void * data) } } +ZenohReply::ZenohReply(const z_owned_reply_t * reply) +{ + reply_ = *reply; +} + +ZenohReply::~ZenohReply() +{ + z_reply_drop(z_move(reply_)); +} + +const z_sample_t ZenohReply::get_sample() const +{ + return z_reply_ok(&reply_); +} + +size_t rmw_client_data_t::get_next_sequence_number() +{ + std::lock_guard lock(sequence_number_mutex); + return sequence_number++; +} + //============================================================================== void client_data_handler(z_owned_reply_t * reply, void * data) { @@ -147,7 +183,7 @@ void client_data_handler(z_owned_reply_t * reply, void * data) { std::lock_guard msg_lock(client_data->replies_mutex); // Take ownership of the reply. - client_data->replies.emplace_back(std::make_unique(*reply)); + client_data->replies.emplace_back(std::make_unique(reply)); *reply = z_reply_null(); } { diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index b724fc2c..e6e0c34c 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -139,10 +139,21 @@ void client_data_handler(z_owned_reply_t * reply, void * client_data); ///============================================================================== -struct rmw_service_data_t +class ZenohQuery final { - std::size_t get_new_uid(); +public: + ZenohQuery(const z_query_t * query); + + ~ZenohQuery(); + const z_query_t get_query() const; + +private: + z_owned_query_t query_; +}; + +struct rmw_service_data_t +{ z_owned_keyexpr_t keyexpr; z_owned_queryable_t qable; @@ -158,11 +169,11 @@ struct rmw_service_data_t rmw_context_t * context; // Deque to store the queries in the order they arrive. - std::deque> query_queue; + std::deque> query_queue; std::mutex query_queue_mutex; // Map to store the sequence_number -> query_id - std::unordered_map> sequence_to_query_map; + std::unordered_map> sequence_to_query_map; std::mutex sequence_to_query_map_mutex; std::mutex internal_mutex; @@ -171,6 +182,19 @@ struct rmw_service_data_t ///============================================================================== +class ZenohReply final +{ +public: + ZenohReply(const z_owned_reply_t * reply); + + ~ZenohReply(); + + const z_sample_t get_sample() const; + +private: + z_owned_reply_t reply_; +}; + struct rmw_client_data_t { z_owned_keyexpr_t keyexpr; @@ -180,7 +204,7 @@ struct rmw_client_data_t zc_owned_liveliness_token_t token; std::mutex replies_mutex; - std::deque> replies; + std::deque> replies; const void * request_type_support_impl; const void * response_type_support_impl; @@ -193,6 +217,7 @@ struct rmw_client_data_t std::mutex internal_mutex; std::condition_variable * condition{nullptr}; + size_t get_next_sequence_number(); std::mutex sequence_number_mutex; size_t sequence_number{1}; }; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 32a29903..ad5eebd7 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1942,9 +1942,6 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) // CLEANUP =================================================================== z_drop(z_move(client_data->zn_closure_reply)); z_drop(z_move(client_data->keyexpr)); - for (std::unique_ptr & reply : client_data->replies) { - z_reply_drop(reply.get()); - } client_data->replies.clear(); z_drop(z_move(client_data->token)); @@ -2046,10 +2043,7 @@ rmw_send_request( size_t data_length = ser.getSerializedDataLength(); - { - std::lock_guard lock(client_data->sequence_number_mutex); - *sequence_id = client_data->sequence_number++; - } + *sequence_id = client_data->get_next_sequence_number(); // Send request z_get_options_t opts = z_get_options_default(); @@ -2168,15 +2162,17 @@ rmw_take_response( RMW_CHECK_FOR_NULL_WITH_MSG( client->data, "Unable to retrieve client_data from client.", RMW_RET_INVALID_ARGUMENT); - std::unique_ptr latest_reply = nullptr; - - std::lock_guard lock(client_data->replies_mutex); - if (client_data->replies.empty()) { - RCUTILS_LOG_ERROR_NAMED("rmw_zenoh_cpp", "[rmw_take_response] Response message is empty"); - return RMW_RET_ERROR; + std::unique_ptr latest_reply = nullptr; + { + std::lock_guard lock(client_data->replies_mutex); + if (client_data->replies.empty()) { + RCUTILS_LOG_ERROR_NAMED("rmw_zenoh_cpp", "[rmw_take_response] Response message is empty"); + return RMW_RET_ERROR; + } + latest_reply = std::move(client_data->replies.front()); + client_data->replies.pop_front(); } - latest_reply = std::move(client_data->replies.front()); - z_sample_t sample = z_reply_ok(latest_reply.get()); + const z_sample_t sample = latest_reply->get_sample(); // Object that manages the raw buffer eprosima::fastcdr::FastBuffer fastbuffer( @@ -2207,9 +2203,6 @@ rmw_take_response( *taken = true; - z_reply_drop(latest_reply.get()); - client_data->replies.pop_front(); - return RMW_RET_OK; } @@ -2535,9 +2528,7 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) // CLEANUP ================================================================ z_drop(z_move(service_data->keyexpr)); z_drop(z_move(service_data->qable)); - for (std::unique_ptr & query : service_data->query_queue) { - z_drop(z_move(*query.get())); - } + service_data->sequence_to_query_map.clear(); service_data->query_queue.clear(); z_drop(z_move(service_data->token)); @@ -2581,16 +2572,17 @@ rmw_take_request( RMW_CHECK_FOR_NULL_WITH_MSG( service->data, "Unable to retrieve service_data from service", RMW_RET_INVALID_ARGUMENT); - std::unique_ptr query; + std::unique_ptr query = nullptr; { std::lock_guard lock(service_data->query_queue_mutex); if (service_data->query_queue.empty()) { return RMW_RET_OK; } query = std::move(service_data->query_queue.front()); + service_data->query_queue.pop_front(); } - const z_query_t loaned_query = z_query_loan(query.get()); + const z_query_t loaned_query = query->get_query(); // Get the sequence_number out of the attachment z_attachment_t attachment = z_query_attachment(&loaned_query); @@ -2640,8 +2632,6 @@ rmw_take_request( // and received_timestamp request_header->request_id.sequence_number = sequence_number; - service_data->query_queue.pop_front(); - *taken = true; return RMW_RET_OK; @@ -2720,7 +2710,7 @@ rmw_send_response( RMW_SET_ERROR_MSG("Unable to find taken request. Report this bug."); return RMW_RET_ERROR; } - const z_query_t loaned_query = z_query_loan(query_it->second.get()); + const z_query_t loaned_query = query_it->second->get_query(); z_query_reply_options_t options = z_query_reply_options_default(); // TODO(clalancette): We also need to fill in and send the writer_guid @@ -2741,7 +2731,6 @@ rmw_send_response( &loaned_query, z_loan(service_data->keyexpr), reinterpret_cast( response_bytes), data_length, &options); - z_drop(z_move(*query_it->second.get())); service_data->sequence_to_query_map.erase(query_it); return RMW_RET_OK; } From d3fc35300cac9ed3437a28267363ad02d32b4e70 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Tue, 16 Jan 2024 15:41:16 +0000 Subject: [PATCH 05/18] Revamp rmw_shutdown and rmw_context_fini. rmw_context_fini is called during the atexit handler, so we can't do complex things like shutdown the session. Instead, switch to shutting down the session during rmw_shutdown. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/rmw_init.cpp | 48 ++++++++++++++++----------------- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 4 +++ 2 files changed, 28 insertions(+), 24 deletions(-) diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 1b612d88..35565ed2 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -351,6 +351,28 @@ rmw_shutdown(rmw_context_t * context) rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + // We destroy zenoh artifacts here instead of rmw_shutdown() as + // rmw_shutdown() is invoked before rmw_destroy_node() however we still need the session + // alive for the latter. + // TODO(Yadunund): Check if this is a bug in rmw. + z_undeclare_subscriber(z_move(context->impl->graph_subscriber)); + // Close the zenoh session + if (z_close(z_move(context->impl->session)) < 0) { + RMW_SET_ERROR_MSG("Error while closing zenoh session"); + return RMW_RET_ERROR; + } + + const rcutils_allocator_t * allocator = &context->options.allocator; + + z_drop(z_move(context->impl->shm_manager)); + + RMW_TRY_DESTRUCTOR( + static_cast(context->impl->graph_guard_condition->data)->~GuardCondition(), + GuardCondition, ); + allocator->deallocate(context->impl->graph_guard_condition->data, allocator->state); + + allocator->deallocate(context->impl->graph_guard_condition, allocator->state); + context->impl->is_shutdown = true; return RMW_RET_OK; } @@ -375,32 +397,10 @@ rmw_context_fini(rmw_context_t * context) return RMW_RET_INVALID_ARGUMENT; } - // We destroy zenoh artifacts here instead of rmw_shutdown() as - // rmw_shutdown() is invoked before rmw_destroy_node() however we still need the session - // alive for the latter. - // TODO(Yadunund): Check if this is a bug in rmw. - z_undeclare_subscriber(z_move(context->impl->graph_subscriber)); - // Close the zenoh session - if (z_close(z_move(context->impl->session)) < 0) { - RMW_SET_ERROR_MSG("Error while closing zenoh session"); - return RMW_RET_ERROR; - } - - const rcutils_allocator_t * allocator = &context->options.allocator; - - z_drop(z_move(context->impl->shm_manager)); - - RMW_TRY_DESTRUCTOR( - static_cast(context->impl->graph_guard_condition->data)->~GuardCondition(), - GuardCondition, ); - allocator->deallocate(context->impl->graph_guard_condition->data, allocator->state); - - allocator->deallocate(context->impl->graph_guard_condition, allocator->state); + rmw_ret_t ret = rmw_init_options_fini(&context->options); RMW_TRY_DESTRUCTOR(context->impl->~rmw_context_impl_t(), rmw_context_impl_t, ); - allocator->deallocate(context->impl, allocator->state); - - rmw_ret_t ret = rmw_init_options_fini(&context->options); + // context->impl will be deallocated by rcl *context = rmw_get_zero_initialized_context(); diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index ad5eebd7..3f394e66 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1483,6 +1483,10 @@ static rmw_ret_t __rmw_take( auto sub_data = static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); + if (sub_data->context->impl->is_shutdown) { + return RMW_RET_OK; + } + // RETRIEVE SERIALIZED MESSAGE =============================================== std::unique_ptr msg_data; From b6d5bc4686cfd0934805234bc68f3f258fb15840 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Tue, 16 Jan 2024 17:37:03 +0000 Subject: [PATCH 06/18] Make sure to check z_reply_is_ok. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 9 +++++++-- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 3 ++- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 17 +++++++++++++---- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 99241ca5..faf9f35a 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -15,6 +15,7 @@ #include #include +#include #include #include "rcpputils/scope_exit.hpp" @@ -144,9 +145,13 @@ ZenohReply::~ZenohReply() z_reply_drop(z_move(reply_)); } -const z_sample_t ZenohReply::get_sample() const +std::optional ZenohReply::get_sample() const { - return z_reply_ok(&reply_); + if (z_reply_is_ok(&reply_)) { + return z_reply_ok(&reply_); + } + + return std::nullopt; } size_t rmw_client_data_t::get_next_sequence_number() diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index e6e0c34c..4b4e468e 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -189,7 +190,7 @@ class ZenohReply final ~ZenohReply(); - const z_sample_t get_sample() const; + std::optional get_sample() const; private: z_owned_reply_t reply_; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 3f394e66..114edb10 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -19,9 +19,13 @@ #include #include +#include #include #include +#include #include +#include +#include #include "detail/guard_condition.hpp" #include "detail/graph_cache.hpp" @@ -2176,12 +2180,16 @@ rmw_take_response( latest_reply = std::move(client_data->replies.front()); client_data->replies.pop_front(); } - const z_sample_t sample = latest_reply->get_sample(); + std::optional sample = latest_reply->get_sample(); + if (!sample) { + RMW_SET_ERROR_MSG("invalid reply sample"); + return RMW_RET_ERROR; + } // Object that manages the raw buffer eprosima::fastcdr::FastBuffer fastbuffer( - reinterpret_cast(const_cast(sample.payload.start)), - sample.payload.len); + reinterpret_cast(const_cast(sample->payload.start)), + sample->payload.len); // Object that serializes the data eprosima::fastcdr::Cdr deser( @@ -2197,7 +2205,8 @@ rmw_take_response( return RMW_RET_ERROR; } - request_header->request_id.sequence_number = get_sequence_num_from_attachment(&sample.attachment); + request_header->request_id.sequence_number = + get_sequence_num_from_attachment(&sample->attachment); if (request_header->request_id.sequence_number < 0) { // get_sequence_num_from_attachment already set an error return RMW_RET_ERROR; From a748c3de0b40329cfe4c350146bace583e2cb975 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Tue, 16 Jan 2024 17:57:08 +0000 Subject: [PATCH 07/18] Add in header includes as needed. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 1 + rmw_zenoh_cpp/src/rmw_event.cpp | 1 + rmw_zenoh_cpp/src/rmw_get_network_flow_endpoints.cpp | 3 +++ rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp | 3 +++ rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp | 3 +++ rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp | 3 +++ rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp | 4 +++- rmw_zenoh_cpp/src/rmw_init.cpp | 1 + rmw_zenoh_cpp/src/rmw_init_options.cpp | 2 +- 9 files changed, 19 insertions(+), 2 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index faf9f35a..2f8ce28d 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -14,6 +14,7 @@ #include +#include #include #include #include diff --git a/rmw_zenoh_cpp/src/rmw_event.cpp b/rmw_zenoh_cpp/src/rmw_event.cpp index d547e4c0..e7e44a4b 100644 --- a/rmw_zenoh_cpp/src/rmw_event.cpp +++ b/rmw_zenoh_cpp/src/rmw_event.cpp @@ -16,6 +16,7 @@ #include "rmw/error_handling.h" #include "rmw/event.h" +#include "rmw/types.h" #include "detail/identifier.hpp" diff --git a/rmw_zenoh_cpp/src/rmw_get_network_flow_endpoints.cpp b/rmw_zenoh_cpp/src/rmw_get_network_flow_endpoints.cpp index 381f3dbb..6ad3f52d 100644 --- a/rmw_zenoh_cpp/src/rmw_get_network_flow_endpoints.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_network_flow_endpoints.cpp @@ -12,7 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "rcutils/allocator.h" + #include "rmw/get_network_flow_endpoints.h" +#include "rmw/types.h" extern "C" { diff --git a/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp index 79e069ab..abcb6058 100644 --- a/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp @@ -16,8 +16,11 @@ #include "detail/liveliness_utils.hpp" #include "detail/rmw_data_types.hpp" +#include "rcutils/allocator.h" + #include "rmw/get_node_info_and_types.h" #include "rmw/impl/cpp/macros.hpp" +#include "rmw/types.h" extern "C" { diff --git a/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp index 9cf19aae..34f747f5 100644 --- a/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp @@ -14,8 +14,11 @@ #include "detail/rmw_data_types.hpp" +#include "rcutils/allocator.h" + #include "rmw/error_handling.h" #include "rmw/get_service_names_and_types.h" +#include "rmw/types.h" extern "C" { diff --git a/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp b/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp index 12795f41..d6f3b690 100644 --- a/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp @@ -17,8 +17,11 @@ #include "detail/liveliness_utils.hpp" #include "detail/rmw_data_types.hpp" +#include "rcutils/allocator.h" + #include "rmw/get_topic_endpoint_info.h" #include "rmw/impl/cpp/macros.hpp" +#include "rmw/types.h" extern "C" { diff --git a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp index c5218e1e..7e2f3d24 100644 --- a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp @@ -14,14 +14,16 @@ #include "detail/rmw_data_types.hpp" +#include "rcutils/allocator.h" + #include "rmw/error_handling.h" #include "rmw/get_topic_names_and_types.h" +#include "rmw/types.h" extern "C" { ///============================================================================== /// Return all topic names and types in the ROS graph. -// TODO(yadunund): Fix implementation once discovery information can be cached. rmw_ret_t rmw_get_topic_names_and_types( const rmw_node_t * node, diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 35565ed2..3239027f 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -15,6 +15,7 @@ #include #include +#include #include "detail/guard_condition.hpp" #include "detail/identifier.hpp" diff --git a/rmw_zenoh_cpp/src/rmw_init_options.cpp b/rmw_zenoh_cpp/src/rmw_init_options.cpp index c34a6262..9eb146a4 100644 --- a/rmw_zenoh_cpp/src/rmw_init_options.cpp +++ b/rmw_zenoh_cpp/src/rmw_init_options.cpp @@ -17,7 +17,7 @@ #include "detail/identifier.hpp" #include "detail/rmw_init_options_impl.hpp" - +#include "rcutils/allocator.h" #include "rcutils/strdup.h" #include "rcutils/types.h" From 9c9280706b938b00a62a0e3e572c731b1ff28222 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Tue, 16 Jan 2024 19:17:06 +0000 Subject: [PATCH 08/18] Remove a lot of debugging statements. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 36 +----------------- rmw_zenoh_cpp/src/detail/liveliness_utils.cpp | 4 +- rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 4 -- rmw_zenoh_cpp/src/detail/zenoh_config.cpp | 7 ---- .../src/detail/zenoh_router_check.cpp | 10 +---- rmw_zenoh_cpp/src/rmw_init.cpp | 19 +--------- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 38 +------------------ 7 files changed, 7 insertions(+), 111 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 4e3b53b2..3282ade0 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -172,15 +172,6 @@ void GraphCache::parse_put(const std::string & keyexpr) topic_data_insertion.first->second->stats_.sub_count_ += sub_count; } } - - RCUTILS_LOG_INFO_NAMED( - "rmw_zenoh_cpp", - "Added %s on topic %s with type %s and qos %s to node /%s.", - entity_desc.c_str(), - topic_info.name_.c_str(), - topic_info.type_.c_str(), - topic_info.qos_.c_str(), - graph_node.name_.c_str()); }; // Helper lambda to convert an Entity into a GraphNode. @@ -213,10 +204,6 @@ void GraphCache::parse_put(const std::string & keyexpr) NodeMap node_map = { {entity.node_name(), make_graph_node(entity, *this)}}; graph_.emplace(std::make_pair(entity.node_namespace(), std::move(node_map))); - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Added node /%s to a new namespace %s in the graph.", - entity.node_name().c_str(), - entity.node_namespace().c_str()); return; } @@ -238,14 +225,7 @@ void GraphCache::parse_put(const std::string & keyexpr) // name but unique id. NodeMap::iterator insertion_it = ns_it->second.insert(std::make_pair(entity.node_name(), make_graph_node(entity, *this))); - if (insertion_it != ns_it->second.end()) { - RCUTILS_LOG_INFO_NAMED( - "rmw_zenoh_cpp", - "Added a new node /%s with id %s to an existing namespace %s in the graph.", - entity.node_name().c_str(), - entity.id().c_str(), - entity.node_namespace().c_str()); - } else { + if (insertion_it == ns_it->second.end()) { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to add a new node /%s with id %s an " @@ -409,15 +389,6 @@ void GraphCache::parse_del(const std::string & keyexpr) // Bookkeeping: Update graph_topic_ which keeps track of topics across all nodes in the graph. update_graph_topics(topic_info, entity.type(), pub_count, sub_count, graph_cache); - - RCUTILS_LOG_INFO_NAMED( - "rmw_zenoh_cpp", - "Removed %s on topic %s with type %s and qos %s to node /%s.", - entity_desc.c_str(), - topic_info.name_.c_str(), - topic_info.type_.c_str(), - topic_info.qos_.c_str(), - graph_node.name_.c_str()); }; // Lock the graph mutex before accessing the graph. @@ -484,11 +455,6 @@ void GraphCache::parse_del(const std::string & keyexpr) remove_topics(graph_node->clients_, EntityType::Client, *this); } ns_it->second.erase(node_it); - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "Removed node /%s from the graph.", - entity.node_name().c_str() - ); return; } diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index 7118b64b..00718196 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -348,7 +348,7 @@ bool PublishToken::put( RCUTILS_SET_ERROR_MSG("invalid keyexpression generation for liveliness publication."); return false; } - RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "Sending PUT on %s", token.c_str()); + z_put_options_t options = z_put_options_default(); options.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL); if (z_put(z_loan(*session), z_keyexpr(token.c_str()), nullptr, 0, &options) < 0) { @@ -379,7 +379,7 @@ bool PublishToken::del( RCUTILS_SET_ERROR_MSG("invalid key-expression generation for liveliness publication."); return false; } - RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "Sending DELETE on %s", token.c_str()); + const z_delete_options_t options = z_delete_options_default(); if (z_delete(z_loan(*session), z_loan(keyexpr), &options) < 0) { RCUTILS_SET_ERROR_MSG("failed to delete liveliness key"); diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 2f8ce28d..ee52b895 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -101,10 +101,6 @@ const z_query_t ZenohQuery::get_query() const //============================================================================== void service_data_handler(const z_query_t * query, void * data) { - RCUTILS_LOG_INFO_NAMED( - "rmw_zenoh_cpp", - "[service_data_handler] triggered" - ); z_owned_str_t keystr = z_keyexpr_to_string(z_query_keyexpr(query)); auto drop_keystr = rcpputils::make_scope_exit( [&keystr]() { diff --git a/rmw_zenoh_cpp/src/detail/zenoh_config.cpp b/rmw_zenoh_cpp/src/detail/zenoh_config.cpp index dd799dfe..8f4e3ef0 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_config.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_config.cpp @@ -50,19 +50,12 @@ rmw_ret_t get_z_config(z_owned_config_t * config) if (zenoh_config_path[0] != '\0') { // If the environment variable is set, try to read the configuration from the file. *config = zc_config_from_file(zenoh_config_path); - RCUTILS_LOG_INFO_NAMED( - "ZenohConfiguration", - "Using zenoh configuration file pointed by '%s' envar: '%s'", kZenohConfigFileEnvVar, - zenoh_config_path); } else { // If the environment variable is not set use internal configuration static const std::string path_to_config_folder = ament_index_cpp::get_package_share_directory(rmw_zenoh_identifier) + "/config/"; const std::string default_zconfig_path = path_to_config_folder + kDefaultZenohConfigFileName; *config = zc_config_from_file(default_zconfig_path.c_str()); - RCUTILS_LOG_INFO_NAMED( - "ZenohConfiguration", - "Using default zenoh configuration file at '%s'", default_zconfig_path.c_str()); } // Verify that the configuration is valid. diff --git a/rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp b/rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp index 2f6ed398..353de39a 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp @@ -56,15 +56,12 @@ rmw_ret_t zenoh_router_check(z_session_t session) // Define callback auto callback = [](const struct z_id_t * id, void * ctx) { const std::string id_str = zid_to_str(*id); - RCUTILS_LOG_INFO_NAMED( - "ZenohRouterCheck", - "A Zenoh router connected to the session with id '%s'", id_str.c_str()); // Note: Callback is guaranteed to never be called // concurrently according to z_info_routers_zid docstring (*(static_cast(ctx)))++; }; - rmw_ret_t ret; + rmw_ret_t ret = RMW_RET_OK; z_owned_closure_zid_t router_callback = z_closure(callback, nullptr /* drop */, &context); if (z_info_routers_zid(session, z_move(router_callback))) { RCUTILS_LOG_ERROR_NAMED( @@ -77,11 +74,6 @@ rmw_ret_t zenoh_router_check(z_session_t session) "ZenohRouterCheck", "No Zenoh router connected to the session"); ret = RMW_RET_ERROR; - } else { - RCUTILS_LOG_INFO_NAMED( - "ZenohRouterCheck", - "There are %d Zenoh routers connected to the session", context); - ret = RMW_RET_OK; } } diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 3239027f..af3af26e 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -47,11 +47,6 @@ static void graph_sub_data_handler( { (void)data; z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "[graph_sub_data_handler] Received key '%s'", - z_loan(keystr) - ); // Get the context impl from data. rmw_context_impl_s * context_impl = static_cast( @@ -251,11 +246,7 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) const std::string liveliness_str = liveliness::subscription_token(context->actual_domain_id); // Query router/liveliness participants to get graph information before this session was started. - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "Sending Query '%s' to fetch discovery data...", - liveliness_str.c_str() - ); + // We create a blocking channel that is unbounded, ie. `bound` = 0, to receive // replies for the zc_liveliness_get() call. This is necessary as if the `bound` // is too low, the channel may starve the zenoh executor of its threads which @@ -276,9 +267,6 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) if (z_reply_is_ok(&reply)) { z_sample_t sample = z_reply_ok(&reply); z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); - printf( - ">> [discovery] Received ('%s': '%.*s')\n", z_loan(keystr), - static_cast(sample.payload.len), sample.payload.start); context->impl->graph_cache.parse_put(z_loan(keystr)); z_drop(z_move(keystr)); } else { @@ -289,11 +277,6 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) z_drop(z_move(channel)); // TODO(Yadunund): Switch this to a liveliness subscriptions once the API is available. - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "Setting up liveliness subscription on key: %s", - liveliness_str.c_str() - ); // Uncomment and rely on #if #endif blocks to enable this feature when building with // zenoh-pico since liveliness is only available in zenoh-c. diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 114edb10..64148467 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1679,11 +1679,6 @@ rmw_create_client( const char * service_name, const rmw_qos_profile_t * qos_profile) { - RCUTILS_LOG_INFO_NAMED( - "rmw_zenoh_common_cpp", - "[rmw_create_client] %s with queue of depth %ld", - service_name, - qos_profile->depth); RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( @@ -1991,8 +1986,6 @@ rmw_send_request( const void * ros_request, int64_t * sequence_id) { - RCUTILS_LOG_INFO_NAMED( - "rmw_zenoh_cpp", "[rmw_send_request]"); RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(ros_request, RMW_RET_INVALID_ARGUMENT); @@ -2152,7 +2145,6 @@ rmw_take_response( bool * taken) { *taken = false; - RCUTILS_LOG_INFO_NAMED("rmw_zenoh_cpp", "[rmw_take_response]"); RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); @@ -2254,11 +2246,6 @@ rmw_create_service( const char * service_name, const rmw_qos_profile_t * qos_profiles) { - RCUTILS_LOG_INFO_NAMED( - "rmw_zenoh_cpp", - "[rmw_create_service] %s", - service_name); - // ASSERTIONS ================================================================ RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( @@ -2565,7 +2552,6 @@ rmw_take_request( bool * taken) { *taken = false; - RCUTILS_LOG_INFO_NAMED("rmw_zenoh_cpp", "[rmw_take_request]"); RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(service->data, RMW_RET_INVALID_ARGUMENT); @@ -2658,9 +2644,6 @@ rmw_send_response( rmw_request_id_t * request_header, void * ros_response) { - RCUTILS_LOG_INFO_NAMED( - "rmw_zenoh_cpp", "[rmw_send_response]"); - RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(service->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(request_header, RMW_RET_INVALID_ARGUMENT); @@ -2948,30 +2931,14 @@ rmw_wait( rmw_wait_set_t * wait_set, const rmw_time_t * wait_timeout) { + static_cast(events); + RMW_CHECK_ARGUMENT_FOR_NULL(wait_set, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( wait set handle, wait_set->implementation_identifier, rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - // TODO(yadunund): Switch to debug log level. - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "[rmw_wait] %ld subscriptions, %ld services, %ld clients, %ld events, %ld guard conditions", - subscriptions->subscriber_count, - services->service_count, - clients->client_count, - events->event_count, - guard_conditions->guard_condition_count); - - // TODO(yadunund): Switch to debug log level. - if (wait_timeout) { - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_common_cpp", "[rmw_wait] TIMEOUT: %ld s %ld ns", - wait_timeout->sec, - wait_timeout->nsec); - } - auto wait_set_data = static_cast(wait_set->data); RMW_CHECK_FOR_NULL_WITH_MSG( wait_set_data, @@ -3100,7 +3067,6 @@ rmw_wait( } } - return RMW_RET_OK; } From 1c9638ac647d7c8cbeffc7061b5034b1d43beb6a Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Tue, 16 Jan 2024 19:17:31 +0000 Subject: [PATCH 09/18] Add in source timestamps to service requests. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 64148467..c3425174 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1965,6 +1965,10 @@ static z_owned_bytes_map_t create_map_and_set_sequence_num(int64_t sequence_numb RMW_SET_ERROR_MSG("failed to allocate map for sequence number"); return z_bytes_map_null(); } + auto free_attachment_map = rcpputils::make_scope_exit( + [&map]() { + z_bytes_map_drop(z_move(map)); + }); // The largest possible int64_t number is INT64_MAX, i.e. 9223372036854775807. // That is 19 characters long, plus one for the trailing \0, means we need 20 bytes. @@ -1975,6 +1979,17 @@ static z_owned_bytes_map_t create_map_and_set_sequence_num(int64_t sequence_numb } z_bytes_map_insert_by_copy(&map, z_bytes_new("sequence_number"), z_bytes_new(seq_id_str)); + auto now = std::chrono::system_clock::now().time_since_epoch(); + auto now_ns = std::chrono::duration_cast(now); + char source_ts_str[20]; + if (rcutils_snprintf(source_ts_str, sizeof(source_ts_str), "%" PRId64, now_ns.count()) < 0) { + RMW_SET_ERROR_MSG("failed to print sequence_number into buffer"); + return z_bytes_map_null(); + } + z_bytes_map_insert_by_copy(&map, z_bytes_new("source_timestamp"), z_bytes_new(source_ts_str)); + + free_attachment_map.cancel(); + return map; } From d734356855a8c076789c47eb9b629988ee6f9b5a Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Tue, 16 Jan 2024 19:43:36 +0000 Subject: [PATCH 10/18] Add in source_timestamp for services. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/graph_cache.hpp | 1 - rmw_zenoh_cpp/src/rmw_zenoh.cpp | 99 ++++++++++++++---------- 2 files changed, 57 insertions(+), 43 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index 56357661..85e08a15 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -61,7 +61,6 @@ struct TopicData using TopicDataPtr = std::shared_ptr; ///============================================================================= -// TODO(Yadunund): Expand to services and clients. struct GraphNode { std::string id_; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index c3425174..613f0b91 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -2091,63 +2091,62 @@ rmw_send_request( return RMW_RET_OK; } -static int64_t get_sequence_num_from_attachment(const z_attachment_t * const attachment) +static int64_t get_int64_from_attachment( + const z_attachment_t * const attachment, const std::string & name) { - // Get the sequence_number out of the attachment if (!z_check(*attachment)) { // A valid request must have had an attachment RMW_SET_ERROR_MSG("Could not get attachment from query"); return -1; } - z_bytes_t sequence_num_index = z_attachment_get(*attachment, z_bytes_new("sequence_number")); - if (!z_check(sequence_num_index)) { - // A valid request must have had a sequence number attached - RMW_SET_ERROR_MSG("Could not get sequence number from query"); + z_bytes_t index = z_attachment_get(*attachment, z_bytes_new(name.c_str())); + if (!z_check(index)) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("Could not get %s from attachment", name.c_str()); return -1; } - if (sequence_num_index.len < 1) { - RMW_SET_ERROR_MSG("No value specified for the sequence number"); + if (index.len < 1) { + RMW_SET_ERROR_MSG("no value specified"); return -1; } - if (sequence_num_index.len > 19) { - // The sequence number was larger than we expected - RMW_SET_ERROR_MSG("Sequence number too large"); + if (index.len > 19) { + // The number was larger than we expected + RMW_SET_ERROR_MSG("number too large"); return -1; } // The largest possible int64_t number is INT64_MAX, i.e. 9223372036854775807. // That is 19 characters long, plus one for the trailing \0, means we need 20 bytes. - char sequence_num_str[20]; + char int64_str[20]; - memcpy(sequence_num_str, sequence_num_index.start, sequence_num_index.len); - sequence_num_str[sequence_num_index.len] = '\0'; + memcpy(int64_str, index.start, index.len); + int64_str[index.len] = '\0'; errno = 0; char * endptr; - int64_t seqnum = strtol(sequence_num_str, &endptr, 10); - if (seqnum == 0) { + int64_t num = strtol(int64_str, &endptr, 10); + if (num == 0) { // This is an error regardless; the client should never send this - RMW_SET_ERROR_MSG("A invalid zero value sent as the sequence number"); + RMW_SET_ERROR_MSG("a invalid zero value sent"); return -1; - } else if (endptr == sequence_num_str) { + } else if (endptr == int64_str) { // No values were converted, this is an error - RMW_SET_ERROR_MSG("No valid numbers available in the sequence number"); + RMW_SET_ERROR_MSG("no valid numbers available"); return -1; } else if (*endptr != '\0') { // There was junk after the number - RMW_SET_ERROR_MSG("Non-numeric values in the sequence number"); + RMW_SET_ERROR_MSG("non-numeric values"); return -1; } else if (errno != 0) { // Some other error occurred, which may include overflow or underflow RMW_SET_ERROR_MSG( - "An undefined error occurred while getting the sequence number, this may be an overflow"); + "an undefined error occurred while getting the number, this may be an overflow"); return -1; } - return seqnum; + return num; } //============================================================================== @@ -2212,14 +2211,23 @@ rmw_take_response( return RMW_RET_ERROR; } + // Fill in the request_header + request_header->request_id.sequence_number = - get_sequence_num_from_attachment(&sample->attachment); + get_int64_from_attachment(&sample->attachment, "sequence_number"); if (request_header->request_id.sequence_number < 0) { - // get_sequence_num_from_attachment already set an error + // get_int64_from_attachment already set an error return RMW_RET_ERROR; } - // TODO(clalancette): We also need to fill in the source_timestamp, received_timestamp, - // and writer_guid + + request_header->source_timestamp = + get_int64_from_attachment(&sample->attachment, "source_timestamp"); + if (request_header->source_timestamp < 0) { + // get_int64_from_attachment already set an error + return RMW_RET_ERROR; + } + + // TODO(clalancette): We also need to fill in the received_timestamp and writer_guid *taken = true; @@ -2598,15 +2606,6 @@ rmw_take_request( const z_query_t loaned_query = query->get_query(); - // Get the sequence_number out of the attachment - z_attachment_t attachment = z_query_attachment(&loaned_query); - - int64_t sequence_number = get_sequence_num_from_attachment(&attachment); - if (sequence_number < 0) { - // get_sequence_number_from_attachment already set the error - return RMW_RET_ERROR; - } - // DESERIALIZE MESSAGE ======================================================== z_value_t payload_value = z_query_value(&loaned_query); @@ -2629,23 +2628,39 @@ rmw_take_request( return RMW_RET_ERROR; } + // Fill in the request header. + + // Get the sequence_number out of the attachment + z_attachment_t attachment = z_query_attachment(&loaned_query); + + request_header->request_id.sequence_number = + get_int64_from_attachment(&attachment, "sequence_number"); + if (request_header->request_id.sequence_number < 0) { + // get_int64_from_attachment already set the error + return RMW_RET_ERROR; + } + + request_header->source_timestamp = get_int64_from_attachment(&attachment, "source_timestamp"); + if (request_header->source_timestamp < 0) { + // get_int64_from_attachment already set the error + return RMW_RET_ERROR; + } + + // TODO(clalancette): We also need to fill in writer_guid and received_timestamp + // Add this query to the map, so that rmw_send_response can quickly look it up later { std::lock_guard lock(service_data->sequence_to_query_map_mutex); - if (service_data->sequence_to_query_map.find(sequence_number) != + if (service_data->sequence_to_query_map.find(request_header->request_id.sequence_number) != service_data->sequence_to_query_map.end()) { RMW_SET_ERROR_MSG("duplicate sequence number in the map"); return RMW_RET_ERROR; } - service_data->sequence_to_query_map.emplace(std::pair(sequence_number, std::move(query))); + service_data->sequence_to_query_map.emplace( + std::pair(request_header->request_id.sequence_number, std::move(query))); } - // Fill in the request header. - // TODO(clalancette): We also need to fill in writer_guid, source_timestamp, - // and received_timestamp - request_header->request_id.sequence_number = sequence_number; - *taken = true; return RMW_RET_OK; From 4a2270b664546ae8065d7d70d5f1c164a0b60a1b Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Wed, 17 Jan 2024 13:50:49 +0000 Subject: [PATCH 11/18] Run the constructor for rmw_publisher_data_t. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 613f0b91..62117c00 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -525,6 +525,13 @@ rmw_create_publisher( allocator->deallocate(publisher_data, allocator->state); }); + RMW_TRY_PLACEMENT_NEW(publisher_data, publisher_data, return nullptr, rmw_publisher_data_t); + auto destruct_publisher_data = rcpputils::make_scope_exit( + [publisher_data]() { + RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( + publisher_data->~rmw_publisher_data_t(), rmw_publisher_data_t); + }); + publisher_data->typesupport_identifier = type_support->typesupport_identifier; publisher_data->type_support_impl = type_support->data; auto callbacks = static_cast(type_support->data); @@ -654,6 +661,7 @@ rmw_create_publisher( free_topic_name.cancel(); destruct_msg_type_support.cancel(); free_type_support.cancel(); + destruct_publisher_data.cancel(); free_publisher_data.cancel(); free_rmw_publisher.cancel(); @@ -711,6 +719,7 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) RMW_SET_ERROR_MSG("failed to undeclare pub"); ret = RMW_RET_ERROR; } + RMW_TRY_DESTRUCTOR(publisher_data->~rmw_publisher_data_t(), rmw_publisher_data_t, ); allocator->deallocate(publisher_data, allocator->state); } allocator->deallocate(const_cast(publisher->topic_name), allocator->state); From c6fec3a7ebffd6055cc0628991490dec7b610bbc Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Wed, 17 Jan 2024 14:15:04 +0000 Subject: [PATCH 12/18] Don't pass unnecessary argument into __rmw_take. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 62117c00..5b45e74c 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1475,8 +1475,7 @@ static rmw_ret_t __rmw_take( const rmw_subscription_t * subscription, void * ros_message, bool * taken, - rmw_message_info_t * message_info, - rmw_subscription_allocation_t * allocation) + rmw_message_info_t * message_info) { RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(subscription->topic_name, RMW_RET_ERROR); @@ -1489,8 +1488,6 @@ static rmw_ret_t __rmw_take( subscription->implementation_identifier, rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - static_cast(allocation); - *taken = false; auto sub_data = static_cast(subscription->data); @@ -1561,9 +1558,11 @@ rmw_take( bool * taken, rmw_subscription_allocation_t * allocation) { + static_cast(allocation); + rmw_message_info_t dummy_msg_info; - return __rmw_take(subscription, ros_message, taken, &dummy_msg_info, allocation); + return __rmw_take(subscription, ros_message, taken, &dummy_msg_info); } //============================================================================== @@ -1576,7 +1575,8 @@ rmw_take_with_info( rmw_message_info_t * message_info, rmw_subscription_allocation_t * allocation) { - return __rmw_take(subscription, ros_message, taken, message_info, allocation); + static_cast(allocation); + return __rmw_take(subscription, ros_message, taken, message_info); } //============================================================================== From 5c735abf4e0be18764db82184cac8ebf54b2a7ac Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Wed, 17 Jan 2024 18:22:54 +0000 Subject: [PATCH 13/18] Fix up memory leaks in clients, services, and the context. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/rmw_init.cpp | 10 +++++++--- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 18 ++++++++++++++++-- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index af3af26e..9cff5ca2 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -358,6 +358,7 @@ rmw_shutdown(rmw_context_t * context) allocator->deallocate(context->impl->graph_guard_condition, allocator->state); context->impl->is_shutdown = true; + return RMW_RET_OK; } @@ -381,10 +382,13 @@ rmw_context_fini(rmw_context_t * context) return RMW_RET_INVALID_ARGUMENT; } - rmw_ret_t ret = rmw_init_options_fini(&context->options); - RMW_TRY_DESTRUCTOR(context->impl->~rmw_context_impl_t(), rmw_context_impl_t, ); - // context->impl will be deallocated by rcl + + const rcutils_allocator_t * allocator = &context->options.allocator; + + allocator->deallocate(context->impl, allocator->state); + + rmw_ret_t ret = rmw_init_options_fini(&context->options); *context = rmw_get_zero_initialized_context(); diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 5b45e74c..5d532ba3 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1912,11 +1912,11 @@ rmw_create_client( free_rmw_client.cancel(); free_client_data.cancel(); - free_request_type_support.cancel(); destruct_request_type_support.cancel(); + free_request_type_support.cancel(); + destruct_response_type_support.cancel(); free_response_type_support.cancel(); destruct_client_data.cancel(); - destruct_response_type_support.cancel(); free_service_name.cancel(); free_ros_keyexpr.cancel(); @@ -1957,8 +1957,15 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) client_data->replies.clear(); z_drop(z_move(client_data->token)); + RMW_TRY_DESTRUCTOR( + client_data->request_type_support->~RequestTypeSupport(), RequestTypeSupport, ); allocator->deallocate(client_data->request_type_support, allocator->state); + + RMW_TRY_DESTRUCTOR( + client_data->response_type_support->~ResponseTypeSupport(), ResponseTypeSupport, ); allocator->deallocate(client_data->response_type_support, allocator->state); + RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, ); + allocator->deallocate(client->data, allocator->state); allocator->deallocate(const_cast(client->service_name), allocator->state); @@ -2564,8 +2571,15 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) service_data->query_queue.clear(); z_drop(z_move(service_data->token)); + RMW_TRY_DESTRUCTOR( + service_data->request_type_support->~RequestTypeSupport(), RequestTypeSupport, ); allocator->deallocate(service_data->request_type_support, allocator->state); + + RMW_TRY_DESTRUCTOR( + service_data->response_type_support->~ResponseTypeSupport(), ResponseTypeSupport, ); allocator->deallocate(service_data->response_type_support, allocator->state); + + RMW_TRY_DESTRUCTOR(service_data->~rmw_service_data_t(), rmw_service_data_t, ); allocator->deallocate(service->data, allocator->state); allocator->deallocate(const_cast(service->service_name), allocator->state); From b2bfac2fbec42aaaf6f3d685b548840d540c68b8 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Wed, 17 Jan 2024 18:24:15 +0000 Subject: [PATCH 14/18] Fill in the received_timetstamp. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 5d532ba3..a74f3b6b 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -2243,6 +2243,10 @@ rmw_take_response( return RMW_RET_ERROR; } + auto now = std::chrono::system_clock::now().time_since_epoch(); + auto now_ns = std::chrono::duration_cast(now); + request_header->received_timestamp = now_ns.count(); + // TODO(clalancette): We also need to fill in the received_timestamp and writer_guid *taken = true; @@ -2669,7 +2673,9 @@ rmw_take_request( return RMW_RET_ERROR; } - // TODO(clalancette): We also need to fill in writer_guid and received_timestamp + auto now = std::chrono::system_clock::now().time_since_epoch(); + auto now_ns = std::chrono::duration_cast(now); + request_header->received_timestamp = now_ns.count(); // Add this query to the map, so that rmw_send_response can quickly look it up later { From d339d60723f65b7775e701b37c709512cf7a9294 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Wed, 17 Jan 2024 20:02:48 +0000 Subject: [PATCH 15/18] Add in a writer_guid to the client and service requests. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 2 + rmw_zenoh_cpp/src/rmw_zenoh.cpp | 72 +++++++++++++++++++-- 2 files changed, 68 insertions(+), 6 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 4b4e468e..aa18cf06 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -218,6 +218,8 @@ struct rmw_client_data_t std::mutex internal_mutex; std::condition_variable * condition{nullptr}; + uint8_t client_guid[RMW_GID_STORAGE_SIZE]; + size_t get_next_sequence_number(); std::mutex sequence_number_mutex; size_t sequence_number{1}; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index a74f3b6b..27e710ae 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -19,10 +19,12 @@ #include #include +#include #include #include #include #include +#include #include #include #include @@ -1679,6 +1681,18 @@ rmw_return_loaned_message_from_subscription( return RMW_RET_UNSUPPORTED; } +static void generate_random_guid(uint8_t guid[RMW_GID_STORAGE_SIZE]) +{ + std::random_device dev; + std::mt19937 rng(dev()); + std::uniform_int_distribution dist( + std::numeric_limits::min(), std::numeric_limits::max()); + + for (size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i) { + guid[i] = dist(rng); + } +} + //============================================================================== /// Create a service client that can send requests to and receive replies from a service server. rmw_client_t * @@ -1772,6 +1786,8 @@ rmw_create_client( rmw_client_data_t); }); + generate_random_guid(client_data->client_guid); + // Obtain the type support const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); if (type_support == nullptr) { @@ -1974,7 +1990,8 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) return RMW_RET_OK; } -static z_owned_bytes_map_t create_map_and_set_sequence_num(int64_t sequence_number) +static z_owned_bytes_map_t create_map_and_set_sequence_num( + int64_t sequence_number, uint8_t guid[RMW_GID_STORAGE_SIZE]) { z_owned_bytes_map_t map = z_bytes_map_new(); if (!z_check(map)) { @@ -2004,6 +2021,15 @@ static z_owned_bytes_map_t create_map_and_set_sequence_num(int64_t sequence_numb } z_bytes_map_insert_by_copy(&map, z_bytes_new("source_timestamp"), z_bytes_new(source_ts_str)); + z_bytes_t guid_bytes; + guid_bytes.len = RMW_GID_STORAGE_SIZE; + guid_bytes.start = static_cast(malloc(RMW_GID_STORAGE_SIZE)); + memcpy(static_cast(const_cast(guid_bytes.start)), guid, RMW_GID_STORAGE_SIZE); + + z_bytes_map_insert_by_copy(&map, z_bytes_new("client_guid"), guid_bytes); + + free(const_cast(guid_bytes.start)); + free_attachment_map.cancel(); return map; @@ -2080,7 +2106,7 @@ rmw_send_request( // Send request z_get_options_t opts = z_get_options_default(); - z_owned_bytes_map_t map = create_map_and_set_sequence_num(*sequence_id); + z_owned_bytes_map_t map = create_map_and_set_sequence_num(*sequence_id, client_data->client_guid); if (!z_check(map)) { // create_map_and_set_sequence_num already set the error return RMW_RET_ERROR; @@ -2165,6 +2191,30 @@ static int64_t get_int64_from_attachment( return num; } +static bool get_client_guid_from_attachment( + const z_attachment_t * const attachment, uint8_t guid[RMW_GID_STORAGE_SIZE]) +{ + if (!z_check(*attachment)) { + RMW_SET_ERROR_MSG("Could not get client_guid from attachment"); + return false; + } + + z_bytes_t index = z_attachment_get(*attachment, z_bytes_new("client_guid")); + if (!z_check(index)) { + RMW_SET_ERROR_MSG("Could not get client_guid from attachment"); + return false; + } + + if (index.len != RMW_GID_STORAGE_SIZE) { + RMW_SET_ERROR_MSG("Invalid size for GUID storage"); + return false; + } + + memcpy(guid, index.start, index.len); + + return true; +} + //============================================================================== /// Take an incoming ROS service response. rmw_ret_t @@ -2243,12 +2293,17 @@ rmw_take_response( return RMW_RET_ERROR; } + if (!get_client_guid_from_attachment( + &sample->attachment, request_header->request_id.writer_guid)) + { + // get_client_guid_from_attachment already set an error + return RMW_RET_ERROR; + } + auto now = std::chrono::system_clock::now().time_since_epoch(); auto now_ns = std::chrono::duration_cast(now); request_header->received_timestamp = now_ns.count(); - // TODO(clalancette): We also need to fill in the received_timestamp and writer_guid - *taken = true; return RMW_RET_OK; @@ -2673,6 +2728,11 @@ rmw_take_request( return RMW_RET_ERROR; } + if (!get_client_guid_from_attachment(&attachment, request_header->request_id.writer_guid)) { + // get_client_guid_from_attachment already set an error + return RMW_RET_ERROR; + } + auto now = std::chrono::system_clock::now().time_since_epoch(); auto now_ns = std::chrono::duration_cast(now); request_header->received_timestamp = now_ns.count(); @@ -2768,8 +2828,8 @@ rmw_send_response( const z_query_t loaned_query = query_it->second->get_query(); z_query_reply_options_t options = z_query_reply_options_default(); - // TODO(clalancette): We also need to fill in and send the writer_guid - z_owned_bytes_map_t map = create_map_and_set_sequence_num(request_header->sequence_number); + z_owned_bytes_map_t map = create_map_and_set_sequence_num( + request_header->sequence_number, request_header->writer_guid); if (!z_check(map)) { // create_map_and_set_sequence_num already set the error return RMW_RET_ERROR; From 62c7b0fdd36c7b92100c34c94fd78e35cdcc5221 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Thu, 18 Jan 2024 19:10:04 +0000 Subject: [PATCH 16/18] Revamp rmw_wait. There are more comments in the code, but in particular this change makes sure we deal with data that is already ready, rather than always waiting. With this in place, services seem to work as promised. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 178 +++++++++++++++++++++++--------- 1 file changed, 132 insertions(+), 46 deletions(-) diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 27e710ae..492f7014 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -3038,90 +3038,173 @@ rmw_destroy_wait_set(rmw_wait_set_t * wait_set) return RMW_RET_OK; } -//============================================================================== -/// Waits on sets of different entities and returns when one is ready. -rmw_ret_t -rmw_wait( +static bool has_triggered_condition( rmw_subscriptions_t * subscriptions, rmw_guard_conditions_t * guard_conditions, rmw_services_t * services, rmw_clients_t * clients, - rmw_events_t * events, - rmw_wait_set_t * wait_set, - const rmw_time_t * wait_timeout) + rmw_events_t * events) { static_cast(events); - RMW_CHECK_ARGUMENT_FOR_NULL(wait_set, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_TYPE_IDENTIFIERS_MATCH( - wait set handle, - wait_set->implementation_identifier, rmw_zenoh_identifier, - return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - - auto wait_set_data = static_cast(wait_set->data); - RMW_CHECK_FOR_NULL_WITH_MSG( - wait_set_data, - "waitset data struct is null", - return RMW_RET_ERROR); - if (guard_conditions) { - // Go through each of the guard conditions, and attach the wait set condition variable to them. - // That way they can wake it up if they are triggered while we are waiting. for (size_t i = 0; i < guard_conditions->guard_condition_count; ++i) { - // This is hard to track down, but each of the (void *) pointers in - // guard_conditions->guard_conditions points to the data field of the related - // rmw_guard_condition_t. So we can directly cast it to GuardCondition. GuardCondition * gc = static_cast(guard_conditions->guard_conditions[i]); - if (gc != nullptr) { - gc->attach_condition(&wait_set_data->condition_variable); + if (gc != nullptr && gc->has_triggered()) { + return true; } } } + // TODO(clalancette): Deal with events + if (subscriptions) { - // Go through each of the subscriptions and attach the wait set condition variable to them. - // That way they can wake it up if they are triggered while we are waiting. for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { auto sub_data = static_cast(subscriptions->subscribers[i]); if (sub_data != nullptr) { - sub_data->condition = &wait_set_data->condition_variable; + std::lock_guard internal_lock(sub_data->internal_mutex); + if (!sub_data->message_queue.empty()) { + return true; + } } } } if (services) { - // Go through each of the services and attach the wait set condition variable to them. - // That way they can wake it up if they are triggered while we are waiting. for (size_t i = 0; i < services->service_count; ++i) { auto serv_data = static_cast(services->services[i]); if (serv_data != nullptr) { - serv_data->condition = &wait_set_data->condition_variable; + std::lock_guard internal_lock(serv_data->internal_mutex); + if (!serv_data->query_queue.empty()) { + return true; + } } } } if (clients) { - // Go through each of the clients and attach the wait set condition variable to them. - // That way they can wake it up if they are triggered while we are waiting. for (size_t i = 0; i < clients->client_count; ++i) { rmw_client_data_t * client_data = static_cast(clients->clients[i]); if (client_data != nullptr) { - client_data->condition = &wait_set_data->condition_variable; + std::lock_guard internal_lock(client_data->internal_mutex); + if (!client_data->replies.empty()) { + return true; + } } } } - std::unique_lock lock(wait_set_data->condition_mutex); + return false; +} - // According to the RMW documentation, if wait_timeout is NULL that means - // "wait forever", if it specified by 0 it means "never wait", and if it is anything else wait - // for that amount of time. - if (wait_timeout == nullptr) { - wait_set_data->condition_variable.wait(lock); - } else { - if (wait_timeout->sec != 0 || wait_timeout->nsec != 0) { - wait_set_data->condition_variable.wait_for( - lock, std::chrono::nanoseconds(wait_timeout->nsec + RCUTILS_S_TO_NS(wait_timeout->sec))); +//============================================================================== +/// Waits on sets of different entities and returns when one is ready. +rmw_ret_t +rmw_wait( + rmw_subscriptions_t * subscriptions, + rmw_guard_conditions_t * guard_conditions, + rmw_services_t * services, + rmw_clients_t * clients, + rmw_events_t * events, + rmw_wait_set_t * wait_set, + const rmw_time_t * wait_timeout) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(wait_set, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + wait set handle, + wait_set->implementation_identifier, rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + + auto wait_set_data = static_cast(wait_set->data); + RMW_CHECK_FOR_NULL_WITH_MSG( + wait_set_data, + "waitset data struct is null", + return RMW_RET_ERROR); + + // rmw_wait should return *all* entities that have data available, and let the caller decide + // how to handle them. + // + // If there is no data currently available in any of the entities we were told to wait on, we + // we attach a context-global condition variable to each entity, calculate a timeout based on + // wait_timeout, and then sleep on the condition variable. If any of the entities has an event + // during that time, it will wake up from that sleep. + // + // If there is data currently available in one or more of the entities, then we'll skip attaching + // the condition variable, and skip the sleep, and instead just go to the last part. + // + // In the last part, we check every entity and see if there are conditions that make it ready. + // If that entity is not ready, then we set the pointer to it to nullptr in the wait set, which + // signals to the upper layers that it isn't ready. If something is ready, then we leave it as + // a valid pointer. + + bool skip_wait = has_triggered_condition( + subscriptions, guard_conditions, services, clients, events); + bool wait_result = true; + + if (!skip_wait) { + if (guard_conditions) { + // Attach the wait set condition variable to each guard condition. + // That way they can wake it up if they are triggered while we are waiting. + for (size_t i = 0; i < guard_conditions->guard_condition_count; ++i) { + // This is hard to track down, but each of the (void *) pointers in + // guard_conditions->guard_conditions points to the data field of the related + // rmw_guard_condition_t. So we can directly cast it to GuardCondition. + GuardCondition * gc = static_cast(guard_conditions->guard_conditions[i]); + if (gc != nullptr) { + gc->attach_condition(&wait_set_data->condition_variable); + } + } + } + + if (subscriptions) { + // Attach the wait set condition variable to each subscription. + // That way they can wake it up if they are triggered while we are waiting. + for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { + auto sub_data = static_cast(subscriptions->subscribers[i]); + if (sub_data != nullptr) { + std::lock_guard internal_lock(sub_data->internal_mutex); + sub_data->condition = &wait_set_data->condition_variable; + } + } + } + + if (services) { + // Attach the wait set condition variable to each service. + // That way they can wake it up if they are triggered while we are waiting. + for (size_t i = 0; i < services->service_count; ++i) { + auto serv_data = static_cast(services->services[i]); + if (serv_data != nullptr) { + std::lock_guard internal_lock(serv_data->internal_mutex); + serv_data->condition = &wait_set_data->condition_variable; + } + } + } + + if (clients) { + // Attach the wait set condition variable to each client. + // That way they can wake it up if they are triggered while we are waiting. + for (size_t i = 0; i < clients->client_count; ++i) { + rmw_client_data_t * client_data = static_cast(clients->clients[i]); + if (client_data != nullptr) { + std::lock_guard internal_lock(client_data->internal_mutex); + client_data->condition = &wait_set_data->condition_variable; + } + } + } + + std::unique_lock lock(wait_set_data->condition_mutex); + + // According to the RMW documentation, if wait_timeout is NULL that means + // "wait forever", if it specified by 0 it means "never wait", and if it is anything else wait + // for that amount of time. + if (wait_timeout == nullptr) { + wait_set_data->condition_variable.wait(lock); + } else { + if (wait_timeout->sec != 0 || wait_timeout->nsec != 0) { + std::cv_status wait_status = wait_set_data->condition_variable.wait_for( + lock, std::chrono::nanoseconds(wait_timeout->nsec + RCUTILS_S_TO_NS(wait_timeout->sec))); + wait_result = wait_status == std::cv_status::no_timeout; + } } } @@ -3145,6 +3228,7 @@ rmw_wait( for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { auto sub_data = static_cast(subscriptions->subscribers[i]); if (sub_data != nullptr) { + std::lock_guard internal_lock(sub_data->internal_mutex); sub_data->condition = nullptr; // According to the documentation for rmw_wait in rmw.h, entries in the // array that have *not* been triggered should be set to NULL @@ -3161,6 +3245,7 @@ rmw_wait( for (size_t i = 0; i < services->service_count; ++i) { auto serv_data = static_cast(services->services[i]); if (serv_data != nullptr) { + std::lock_guard internal_lock(serv_data->internal_mutex); serv_data->condition = nullptr; if (serv_data->query_queue.empty()) { // Setting to nullptr lets rcl know that this service is not ready @@ -3175,6 +3260,7 @@ rmw_wait( for (size_t i = 0; i < clients->client_count; ++i) { rmw_client_data_t * client_data = static_cast(clients->clients[i]); if (client_data != nullptr) { + std::lock_guard internal_lock(client_data->internal_mutex); client_data->condition = nullptr; // According to the documentation for rmw_wait in rmw.h, entries in the // array that have *not* been triggered should be set to NULL @@ -3186,7 +3272,7 @@ rmw_wait( } } - return RMW_RET_OK; + return (skip_wait || wait_result) ? RMW_RET_OK : RMW_RET_TIMEOUT; } //============================================================================== From 3389d172b8e1e9805dc92b6a059fc8250940be47 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Thu, 18 Jan 2024 19:29:47 +0000 Subject: [PATCH 17/18] Switch which end of the deque subscriptions push and pop from. Since it is a deque, it doesn't *really* matter whether we push from the back and pull from the front, or push from the front and pull from the back. Switch to pushing to the back and pulling from the front so we match what the clients and services are doing. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 6 +++--- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 5 ++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index ee52b895..6e80a97c 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -65,12 +65,12 @@ void sub_data_handler( sub_data->queue_depth, z_loan(keystr)); - std::unique_ptr old = std::move(sub_data->message_queue.back()); + std::unique_ptr old = std::move(sub_data->message_queue.front()); z_drop(&old->payload); - sub_data->message_queue.pop_back(); + sub_data->message_queue.pop_front(); } - sub_data->message_queue.emplace_front( + sub_data->message_queue.emplace_back( std::make_unique( zc_sample_payload_rcinc(sample), sample->timestamp.time, sample->timestamp.id.id)); diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 492f7014..0e1d53c4 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1510,9 +1510,8 @@ static rmw_ret_t __rmw_take( return RMW_RET_OK; } - // NOTE(CH3): Potential place to handle "QoS" (e.g. could pop from back so it is LIFO) - msg_data = std::move(sub_data->message_queue.back()); - sub_data->message_queue.pop_back(); + msg_data = std::move(sub_data->message_queue.front()); + sub_data->message_queue.pop_front(); } // Object that manages the raw buffer From ea436aebe838c66dbb950ae9bcb6942df986a989 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Thu, 18 Jan 2024 19:37:41 +0000 Subject: [PATCH 18/18] Remove unnecessary configuration file. Signed-off-by: Chris Lalancette --- rmw_zenoh_config.json5 | 18 ------------------ 1 file changed, 18 deletions(-) delete mode 100644 rmw_zenoh_config.json5 diff --git a/rmw_zenoh_config.json5 b/rmw_zenoh_config.json5 deleted file mode 100644 index 9b8d8887..00000000 --- a/rmw_zenoh_config.json5 +++ /dev/null @@ -1,18 +0,0 @@ -{ - /// The node's mode (router, peer or client) - mode: "peer", - - /// The default timeout to apply to queries in milliseconds. - queries_default_timeout: 10000, - - /// Configure internal transport parameters - transport: { - qos: { - enabled: true, - }, - // Shared memory configuration - shared_memory: { - enabled: false, - }, - }, -}