From 98962eacbd1f540b85039f7f9f34f92865fc0db2 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Mon, 1 Jul 2024 11:54:57 +0000 Subject: [PATCH] Make sure to track queries that are in-flight. It turns out that in Zenoh, once a query is in-flight (via the z_get zenoh-c API), there is no way to cancel it. With that in mind, consider a sequence of events: 1. rmw_create_client 2. rmw_send_request 3. rmw_destroy_client If all of that happens before the query callback is called, we'll have freed up the structure that we passed to the zenoh closure (client_data), and thus the structure will access already freed memory. The fix for this is relatively complicated. First of all, anytime that there is a call to rmw_send_request (which calls z_get()), it increases a per-client counter. rmw_destroy_client() checks to to see if that counter is 0. If it is, we know both that there are no queries in flight, and also that there will be no new ones (because we are cleaning the client up). Thus the structure is freed directly. If the counter is greater than 0, then there is at least one query in flight and it is not safe to free the structure. However, the client_data structure is marked as "shutdown" at this point. There will not be any *new* requests for this client, but the in-flight ones still need to be dealt with. For the in-flight ones, every time client_data_handler() (the rmw_zenoh_cpp callback for queries) is called, it first decrements the number of in-flight queries. If the client is shutdown, and the number of in-flight queries is 0, then it is safe to free the client_data structure. If the client is shutdown but there are other queries in flight, no actual work is done except for the decrement. There is one case which is not handled here at all, and that has to do with timeouts. Currently the rmw_zenoh_cpp client query timeout is set to essentially infinite. Thus, if a query is in-flight, but never returns, the memory corresponding to that client will be leaked. However, this is already an existing problem; this patch changes that from a UB to a memory leak. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 44 +++++++++++++++++++++ rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 31 +++++++++++++++ rmw_zenoh_cpp/src/rmw_zenoh.cpp | 10 ++++- 3 files changed, 84 insertions(+), 1 deletion(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index c2b94ffb..5f03f5da 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -383,6 +383,36 @@ std::unique_ptr rmw_client_data_t::pop_next_reply() return latest_reply; } +//============================================================================== +// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class +// for the use of this method. +void rmw_client_data_t::increment_in_flight_callbacks() +{ + std::lock_guard lock(in_flight_mutex_); + num_in_flight_++; +} + +//============================================================================== +// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class +// for the use of this method. +bool rmw_client_data_t::shutdown_and_query_in_flight() +{ + std::lock_guard lock(in_flight_mutex_); + is_shutdown_ = true; + + return num_in_flight_ > 0; +} + +//============================================================================== +// See the comment about the "num_in_flight" class variable in the rmw_client_data_t structure +// for the use of this method. +bool rmw_client_data_t::decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight) +{ + std::lock_guard lock(in_flight_mutex_); + queries_in_flight = --num_in_flight_ > 0; + return is_shutdown_; +} + //============================================================================== void sub_data_handler( const z_sample_t * sample, @@ -519,6 +549,20 @@ void client_data_handler(z_owned_reply_t * reply, void * data) ); return; } + + // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for + // why we need to do this. + bool queries_in_flight = false; + bool is_shutdown = client_data->decrement_queries_in_flight_and_is_shutdown(queries_in_flight); + + if (is_shutdown) { + if (!queries_in_flight) { + client_data->context->options.allocator.deallocate( + client_data, client_data->context->options.allocator.state); + } + return; + } + if (!z_reply_check(reply)) { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index fc719953..378ce0b7 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -328,6 +328,15 @@ class rmw_client_data_t final DataCallbackManager data_callback_mgr; + // See the comment for "num_in_flight" below on the use of this method. + void increment_in_flight_callbacks(); + + // See the comment for "num_in_flight" below on the use of this method. + bool shutdown_and_query_in_flight(); + + // See the comment for "num_in_flight" below on the use of this method. + bool decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight); + private: void notify(); @@ -339,6 +348,28 @@ class rmw_client_data_t final std::deque> reply_queue_; mutable std::mutex reply_queue_mutex_; + + // rmw_zenoh uses Zenoh queries to implement clients. It turns out that in Zenoh, there is no + // way to cancel a query once it is in-flight via the z_get() zenoh-c API. Thus, if an + // rmw_zenoh_cpp user does rmw_create_client(), rmw_send_request(), rmw_destroy_client(), but the + // query comes in after the rmw_destroy_client(), rmw_zenoh_cpp could access already-freed memory. + // + // The next 3 variables are used to avoid that situation. Any time a query is initiated via + // rmw_send_request(), num_in_flight_ is incremented. When the Zenoh calls the callback for the + // query reply, num_in_flight_ is decremented. When rmw_destroy_client() is called, is_shutdown_ + // is set to true. If num_in_flight_ is 0, the data associated with this structure is freed. + // If num_in_flight_ is *not* 0, then the data associated with this structure is maintained. + // In the situation where is_shutdown_ is true, and num_in_flight_ drops to 0 in the query + // callback, the query callback will free up the structure. + // + // There is one case which is not handled by this, which has to do with timeouts. The query + // timeout is currently set to essentially infinite. Thus, if a query is in-flight but never + // returns, the memory in this structure will never be freed. There isn't much we can do about + // that at this time, but we may want to consider changing the timeout so that the memory can + // eventually be freed up. + std::mutex in_flight_mutex_; + bool is_shutdown_{false}; + size_t num_in_flight_{0}; }; } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 6ee087d2..d800afab 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -2363,7 +2363,11 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) allocator->deallocate(client_data->response_type_support, allocator->state); RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, ); - allocator->deallocate(client->data, allocator->state); + // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for + // why we need to do this. + if (!client_data->shutdown_and_query_in_flight()) { + allocator->deallocate(client->data, allocator->state); + } allocator->deallocate(const_cast(client->service_name), allocator->state); allocator->deallocate(client, allocator->state); @@ -2471,6 +2475,10 @@ rmw_send_request( z_move(zn_closure_reply), &opts); + // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for + // why we need to do this. + client_data->increment_in_flight_callbacks(); + return RMW_RET_OK; }