From 5c09185f59d0d577a08bcdc83ddba39463164da4 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Fri, 5 Jul 2024 13:52:04 -0400 Subject: [PATCH 1/3] Fix replies coming in after a client is destroyed. (#228) It turns out that in Zenoh, once a query is in-flight (via the z_get zenoh-c API), there is no way to cancel it. With that in mind, consider a sequence of events: 1. rmw_create_client 2. rmw_send_request 3. rmw_destroy_client If all of that happens before the query callback is called, we'll have freed up the structure that we passed to the zenoh closure (client_data), and thus the structure will access already freed memory. The fix for this is relatively complicated. First of all, anytime that there is a call to rmw_send_request (which calls z_get()), it increases a per-client counter. rmw_destroy_client() checks to to see if that counter is 0. If it is, we know both that there are no queries in flight, and also that there will be no new ones (because we are cleaning the client up). Thus the structure is freed directly. If the counter is greater than 0, then there is at least one query in flight and it is not safe to free the structure. However, the client_data structure is marked as "shutdown" at this point. There will not be any new requests for this client, but the in-flight ones still need to be dealt with. For the in-flight ones, we add in a client_data_drop() function that Zenoh calls when the query is complete. That function first decrements the number of in-flight queries. If the client is shutdown, and the number of in-flight queries is 0, then it is safe to free the client_data structure. If the client is shutdown but there are other queries in flight, no actual work is done except for the decrement. There is one case which is not handled here at all, and that has to do with timeouts. Currently the rmw_zenoh_cpp client query timeout is set to essentially infinite. Thus, if a query is in-flight, but never returns, the memory corresponding to that client will be leaked. However, this is already an existing problem; this patch changes that from a UB to a memory leak. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 70 +++++++++++++++++++++ rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 35 ++++++++++- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 27 +++++--- 3 files changed, 124 insertions(+), 8 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index eae1fdfc..51f84b33 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -28,6 +28,7 @@ #include "rcpputils/scope_exit.hpp" #include "rmw/error_handling.h" +#include "rmw/impl/cpp/macros.hpp" #include "attachment_helpers.hpp" #include "rmw_data_types.hpp" @@ -393,6 +394,42 @@ std::unique_ptr rmw_client_data_t::pop_next_reply() return latest_reply; } +//============================================================================== +// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class +// for the use of this method. +void rmw_client_data_t::increment_in_flight_callbacks() +{ + std::lock_guard lock(in_flight_mutex_); + num_in_flight_++; +} + +//============================================================================== +// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class +// for the use of this method. +bool rmw_client_data_t::shutdown_and_query_in_flight() +{ + std::lock_guard lock(in_flight_mutex_); + is_shutdown_ = true; + + return num_in_flight_ > 0; +} + +//============================================================================== +// See the comment about the "num_in_flight" class variable in the rmw_client_data_t structure +// for the use of this method. +bool rmw_client_data_t::decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight) +{ + std::lock_guard lock(in_flight_mutex_); + queries_in_flight = --num_in_flight_ > 0; + return is_shutdown_; +} + +bool rmw_client_data_t::is_shutdown() const +{ + std::lock_guard lock(in_flight_mutex_); + return is_shutdown_; +} + //============================================================================== void sub_data_handler( const z_sample_t * sample, @@ -531,6 +568,13 @@ void client_data_handler(z_owned_reply_t * reply, void * data) ); return; } + + // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for + // why we need to do this. + if (client_data->is_shutdown()) { + return; + } + if (!z_reply_check(reply)) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -556,4 +600,30 @@ void client_data_handler(z_owned_reply_t * reply, void * data) // Since we took ownership of the reply, null it out here *reply = z_reply_null(); } + +void client_data_drop(void * data) +{ + auto client_data = static_cast(data); + if (client_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain client_data_t " + ); + return; + } + + // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for + // why we need to do this. + bool queries_in_flight = false; + bool is_shutdown = client_data->decrement_queries_in_flight_and_is_shutdown(queries_in_flight); + + if (is_shutdown) { + if (!queries_in_flight) { + RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, ); + client_data->context->options.allocator.deallocate( + client_data, client_data->context->options.allocator.state); + } + } +} + } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index f10e84f3..60e4bd01 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -204,6 +204,7 @@ void service_data_handler(const z_query_t * query, void * service_data); ///============================================================================= void client_data_handler(z_owned_reply_t * reply, void * client_data); +void client_data_drop(void * data); ///============================================================================= class ZenohQuery final @@ -297,7 +298,6 @@ class rmw_client_data_t final std::shared_ptr entity; z_owned_keyexpr_t keyexpr; - z_owned_closure_reply_t zn_closure_reply; // Store the actual QoS profile used to configure this client. // The QoS is reused for sending requests and getting responses. @@ -329,6 +329,17 @@ class rmw_client_data_t final DataCallbackManager data_callback_mgr; + // See the comment for "num_in_flight" below on the use of this method. + void increment_in_flight_callbacks(); + + // See the comment for "num_in_flight" below on the use of this method. + bool shutdown_and_query_in_flight(); + + // See the comment for "num_in_flight" below on the use of this method. + bool decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight); + + bool is_shutdown() const; + private: void notify(); @@ -340,6 +351,28 @@ class rmw_client_data_t final std::deque> reply_queue_; mutable std::mutex reply_queue_mutex_; + + // rmw_zenoh uses Zenoh queries to implement clients. It turns out that in Zenoh, there is no + // way to cancel a query once it is in-flight via the z_get() zenoh-c API. Thus, if an + // rmw_zenoh_cpp user does rmw_create_client(), rmw_send_request(), rmw_destroy_client(), but the + // query comes in after the rmw_destroy_client(), rmw_zenoh_cpp could access already-freed memory. + // + // The next 3 variables are used to avoid that situation. Any time a query is initiated via + // rmw_send_request(), num_in_flight_ is incremented. When the Zenoh calls the callback for the + // query reply, num_in_flight_ is decremented. When rmw_destroy_client() is called, is_shutdown_ + // is set to true. If num_in_flight_ is 0, the data associated with this structure is freed. + // If num_in_flight_ is *not* 0, then the data associated with this structure is maintained. + // In the situation where is_shutdown_ is true, and num_in_flight_ drops to 0 in the query + // callback, the query callback will free up the structure. + // + // There is one case which is not handled by this, which has to do with timeouts. The query + // timeout is currently set to essentially infinite. Thus, if a query is in-flight but never + // returns, the memory in this structure will never be freed. There isn't much we can do about + // that at this time, but we may want to consider changing the timeout so that the memory can + // eventually be freed up. + mutable std::mutex in_flight_mutex_; + bool is_shutdown_{false}; + size_t num_in_flight_{0}; }; } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 21f72b34..cf712f2e 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -2360,7 +2360,6 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) return RMW_RET_INVALID_ARGUMENT); // CLEANUP =================================================================== - z_drop(z_move(client_data->zn_closure_reply)); z_drop(z_move(client_data->keyexpr)); zc_liveliness_undeclare_token(z_move(client_data->token)); @@ -2372,9 +2371,13 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) client_data->response_type_support->~ResponseTypeSupport(), rmw_zenoh_cpp::ResponseTypeSupport, ); allocator->deallocate(client_data->response_type_support, allocator->state); - RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, ); - allocator->deallocate(client->data, allocator->state); + // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for + // why we need to do this. + if (!client_data->shutdown_and_query_in_flight()) { + RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, ); + allocator->deallocate(client->data, allocator->state); + } allocator->deallocate(const_cast(client->service_name), allocator->state); allocator->deallocate(client, allocator->state); @@ -2407,6 +2410,10 @@ rmw_send_request( "Unable to retrieve client_data from client.", RMW_RET_INVALID_ARGUMENT); + if (client_data->is_shutdown()) { + return RMW_RET_ERROR; + } + rmw_context_impl_s * context_impl = static_cast( client_data->context->impl); @@ -2461,6 +2468,10 @@ rmw_send_request( z_bytes_map_drop(z_move(map)); }); + // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for + // why we need to do this. + client_data->increment_in_flight_callbacks(); + opts.attachment = z_bytes_map_as_attachment(&map); opts.target = Z_QUERY_TARGET_ALL_COMPLETE; @@ -2474,11 +2485,13 @@ rmw_send_request( // and any number. opts.consolidation = z_query_consolidation_latest(); opts.value.payload = z_bytes_t{data_length, reinterpret_cast(request_bytes)}; - client_data->zn_closure_reply = - z_closure(rmw_zenoh_cpp::client_data_handler, nullptr, client_data); + z_owned_closure_reply_t zn_closure_reply = + z_closure(rmw_zenoh_cpp::client_data_handler, rmw_zenoh_cpp::client_data_drop, client_data); z_get( - z_loan(context_impl->session), z_loan( - client_data->keyexpr), "", &client_data->zn_closure_reply, &opts); + z_loan(context_impl->session), + z_loan(client_data->keyexpr), "", + z_move(zn_closure_reply), + &opts); return RMW_RET_OK; } From 5fbf902e48fc73ff13108eb64abe3c17b4acbcd9 Mon Sep 17 00:00:00 2001 From: Yadu Date: Sun, 7 Jul 2024 22:39:09 -0700 Subject: [PATCH 2/3] Cleanup logging (#239) Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 3 --- 1 file changed, 3 deletions(-) diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index cf712f2e..4b0cb076 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -542,9 +542,6 @@ rmw_create_publisher( if (RMW_RET_OK != ret) { return nullptr; } - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", "create publisher liveliness: %zu", - publisher_data->adapted_qos_profile.liveliness); publisher_data->typesupport_identifier = type_support->typesupport_identifier; publisher_data->type_hash = type_support->get_type_hash_func(type_support); From de50434a816a0d84ce10cc1bd8797ece6e476e5d Mon Sep 17 00:00:00 2001 From: Yuyuan Yuan Date: Wed, 17 Jul 2024 02:45:24 +0800 Subject: [PATCH 3/3] fix: log the function name, the file name, and the line number with macros (#245) --- rmw_zenoh_cpp/src/detail/logging.cpp | 5 ++++- rmw_zenoh_cpp/src/detail/logging.hpp | 3 +++ rmw_zenoh_cpp/src/detail/logging_macros.hpp | 10 +++++----- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/logging.cpp b/rmw_zenoh_cpp/src/detail/logging.cpp index b9ed8cbe..1977b0f2 100644 --- a/rmw_zenoh_cpp/src/detail/logging.cpp +++ b/rmw_zenoh_cpp/src/detail/logging.cpp @@ -36,6 +36,9 @@ void Logger::set_log_level(RCUTILS_LOG_SEVERITY new_level) void Logger::log_named( RCUTILS_LOG_SEVERITY level, + const char * function_name, + const char * file_name, + size_t line_number, const char * name, const char * message, ...) const @@ -47,7 +50,7 @@ void Logger::log_named( RCUTILS_SAFE_FWRITE_TO_STDERR("Failed to get timestamp while doing a console logging.\n"); return; } - static rcutils_log_location_t log_location = {__func__, __FILE__, __LINE__}; + static rcutils_log_location_t log_location = {function_name, file_name, line_number}; va_list args; va_start(args, message); rcutils_logging_console_output_handler( diff --git a/rmw_zenoh_cpp/src/detail/logging.hpp b/rmw_zenoh_cpp/src/detail/logging.hpp index 171d5694..35ca11ed 100644 --- a/rmw_zenoh_cpp/src/detail/logging.hpp +++ b/rmw_zenoh_cpp/src/detail/logging.hpp @@ -32,6 +32,9 @@ class Logger // Log to the console. void log_named( RCUTILS_LOG_SEVERITY level, + const char * function_name, + const char * file_name, + size_t line_number, const char * name, const char * message, ...) const; diff --git a/rmw_zenoh_cpp/src/detail/logging_macros.hpp b/rmw_zenoh_cpp/src/detail/logging_macros.hpp index 1e6ac74f..81201856 100644 --- a/rmw_zenoh_cpp/src/detail/logging_macros.hpp +++ b/rmw_zenoh_cpp/src/detail/logging_macros.hpp @@ -24,14 +24,14 @@ // invoke GraphCache::parse_put() and GraphCache::parse_del() functions. // See https://github.com/ros2/rmw_zenoh/issues/182 for more details. #define RMW_ZENOH_LOG_DEBUG_NAMED(...) {rmw_zenoh_cpp::Logger::get().log_named( \ - RCUTILS_LOG_SEVERITY_DEBUG, __VA_ARGS__);} + RCUTILS_LOG_SEVERITY_DEBUG, __func__, __FILE__, __LINE__, __VA_ARGS__);} #define RMW_ZENOH_LOG_ERROR_NAMED(...) {rmw_zenoh_cpp::Logger::get().log_named( \ - RCUTILS_LOG_SEVERITY_ERROR, __VA_ARGS__);} + RCUTILS_LOG_SEVERITY_ERROR, __func__, __FILE__, __LINE__, __VA_ARGS__);} #define RMW_ZENOH_LOG_FATAL_NAMED(...) {rmw_zenoh_cpp::Logger::get().log_named( \ - RCUTILS_LOG_SEVERITY_FATAL, __VA_ARGS__);} + RCUTILS_LOG_SEVERITY_FATAL, __func__, __FILE__, __LINE__, __VA_ARGS__);} #define RMW_ZENOH_LOG_INFO_NAMED(...) {rmw_zenoh_cpp::Logger::get().log_named( \ - RCUTILS_LOG_SEVERITY_INFO, __VA_ARGS__);} + RCUTILS_LOG_SEVERITY_INFO, __func__, __FILE__, __LINE__, __VA_ARGS__);} #define RMW_ZENOH_LOG_WARN_NAMED(...) {rmw_zenoh_cpp::Logger::get().log_named( \ - RCUTILS_LOG_SEVERITY_WARN, __VA_ARGS__);} + RCUTILS_LOG_SEVERITY_WARN, __func__, __FILE__, __LINE__, __VA_ARGS__);} #endif // DETAIL__LOGGING_MACROS_HPP_