Skip to content

Commit

Permalink
Fix replies coming in after a client is destroyed. (#228)
Browse files Browse the repository at this point in the history
It turns out that in Zenoh, once a query is in-flight (via
the z_get zenoh-c API), there is no way to cancel it. With
that in mind, consider a sequence of events:

1.  rmw_create_client
2.  rmw_send_request
3.  rmw_destroy_client

If all of that happens before the query callback is called,
we'll have freed up the structure that we passed to the zenoh
closure (client_data), and thus the structure will access
already freed memory.

The fix for this is relatively complicated. First of all,
anytime that there is a call to rmw_send_request (which calls z_get()),
it increases a per-client counter. rmw_destroy_client() checks to
to see if that counter is 0. If it is, we know both that there are
no queries in flight, and also that there will be no new ones (because
we are cleaning the client up). Thus the structure is freed directly.
If the counter is greater than 0, then there is at least one query
in flight and it is not safe to free the structure. However, the
client_data structure is marked as "shutdown" at this point. There
will not be any new requests for this client, but the in-flight ones
still need to be dealt with.

For the in-flight ones, we add in a client_data_drop() function that
Zenoh calls when the query is complete.  That function first decrements the
number of in-flight queries. If the client is shutdown, and the number
of in-flight queries is 0, then it is safe to free the client_data
structure. If the client is shutdown but there are other queries in
flight, no actual work is done except for the decrement.

There is one case which is not handled here at all, and that has
to do with timeouts. Currently the rmw_zenoh_cpp client query timeout
is set to essentially infinite. Thus, if a query is in-flight, but never
returns, the memory corresponding to that client will be leaked.
However, this is already an existing problem; this patch changes that
from a UB to a memory leak.

Signed-off-by: Chris Lalancette <[email protected]>
  • Loading branch information
clalancette authored Jul 5, 2024
1 parent 747a72e commit 5c09185
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 8 deletions.
70 changes: 70 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "rcpputils/scope_exit.hpp"

#include "rmw/error_handling.h"
#include "rmw/impl/cpp/macros.hpp"

#include "attachment_helpers.hpp"
#include "rmw_data_types.hpp"
Expand Down Expand Up @@ -393,6 +394,42 @@ std::unique_ptr<ZenohReply> rmw_client_data_t::pop_next_reply()
return latest_reply;
}

//==============================================================================
// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class
// for the use of this method.
void rmw_client_data_t::increment_in_flight_callbacks()
{
std::lock_guard<std::mutex> lock(in_flight_mutex_);
num_in_flight_++;
}

//==============================================================================
// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class
// for the use of this method.
bool rmw_client_data_t::shutdown_and_query_in_flight()
{
std::lock_guard<std::mutex> lock(in_flight_mutex_);
is_shutdown_ = true;

return num_in_flight_ > 0;
}

//==============================================================================
// See the comment about the "num_in_flight" class variable in the rmw_client_data_t structure
// for the use of this method.
bool rmw_client_data_t::decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight)
{
std::lock_guard<std::mutex> lock(in_flight_mutex_);
queries_in_flight = --num_in_flight_ > 0;
return is_shutdown_;
}

bool rmw_client_data_t::is_shutdown() const
{
std::lock_guard<std::mutex> lock(in_flight_mutex_);
return is_shutdown_;
}

//==============================================================================
void sub_data_handler(
const z_sample_t * sample,
Expand Down Expand Up @@ -531,6 +568,13 @@ void client_data_handler(z_owned_reply_t * reply, void * data)
);
return;
}

// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for
// why we need to do this.
if (client_data->is_shutdown()) {
return;
}

if (!z_reply_check(reply)) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
Expand All @@ -556,4 +600,30 @@ void client_data_handler(z_owned_reply_t * reply, void * data)
// Since we took ownership of the reply, null it out here
*reply = z_reply_null();
}

void client_data_drop(void * data)
{
auto client_data = static_cast<rmw_client_data_t *>(data);
if (client_data == nullptr) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to obtain client_data_t "
);
return;
}

// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for
// why we need to do this.
bool queries_in_flight = false;
bool is_shutdown = client_data->decrement_queries_in_flight_and_is_shutdown(queries_in_flight);

if (is_shutdown) {
if (!queries_in_flight) {
RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, );
client_data->context->options.allocator.deallocate(
client_data, client_data->context->options.allocator.state);
}
}
}

} // namespace rmw_zenoh_cpp
35 changes: 34 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ void service_data_handler(const z_query_t * query, void * service_data);

///=============================================================================
void client_data_handler(z_owned_reply_t * reply, void * client_data);
void client_data_drop(void * data);

///=============================================================================
class ZenohQuery final
Expand Down Expand Up @@ -297,7 +298,6 @@ class rmw_client_data_t final
std::shared_ptr<liveliness::Entity> entity;

z_owned_keyexpr_t keyexpr;
z_owned_closure_reply_t zn_closure_reply;

// Store the actual QoS profile used to configure this client.
// The QoS is reused for sending requests and getting responses.
Expand Down Expand Up @@ -329,6 +329,17 @@ class rmw_client_data_t final

DataCallbackManager data_callback_mgr;

// See the comment for "num_in_flight" below on the use of this method.
void increment_in_flight_callbacks();

// See the comment for "num_in_flight" below on the use of this method.
bool shutdown_and_query_in_flight();

// See the comment for "num_in_flight" below on the use of this method.
bool decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight);

bool is_shutdown() const;

private:
void notify();

Expand All @@ -340,6 +351,28 @@ class rmw_client_data_t final

std::deque<std::unique_ptr<rmw_zenoh_cpp::ZenohReply>> reply_queue_;
mutable std::mutex reply_queue_mutex_;

// rmw_zenoh uses Zenoh queries to implement clients. It turns out that in Zenoh, there is no
// way to cancel a query once it is in-flight via the z_get() zenoh-c API. Thus, if an
// rmw_zenoh_cpp user does rmw_create_client(), rmw_send_request(), rmw_destroy_client(), but the
// query comes in after the rmw_destroy_client(), rmw_zenoh_cpp could access already-freed memory.
//
// The next 3 variables are used to avoid that situation. Any time a query is initiated via
// rmw_send_request(), num_in_flight_ is incremented. When the Zenoh calls the callback for the
// query reply, num_in_flight_ is decremented. When rmw_destroy_client() is called, is_shutdown_
// is set to true. If num_in_flight_ is 0, the data associated with this structure is freed.
// If num_in_flight_ is *not* 0, then the data associated with this structure is maintained.
// In the situation where is_shutdown_ is true, and num_in_flight_ drops to 0 in the query
// callback, the query callback will free up the structure.
//
// There is one case which is not handled by this, which has to do with timeouts. The query
// timeout is currently set to essentially infinite. Thus, if a query is in-flight but never
// returns, the memory in this structure will never be freed. There isn't much we can do about
// that at this time, but we may want to consider changing the timeout so that the memory can
// eventually be freed up.
mutable std::mutex in_flight_mutex_;
bool is_shutdown_{false};
size_t num_in_flight_{0};
};
} // namespace rmw_zenoh_cpp

Expand Down
27 changes: 20 additions & 7 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2360,7 +2360,6 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client)
return RMW_RET_INVALID_ARGUMENT);

// CLEANUP ===================================================================
z_drop(z_move(client_data->zn_closure_reply));
z_drop(z_move(client_data->keyexpr));
zc_liveliness_undeclare_token(z_move(client_data->token));

Expand All @@ -2372,9 +2371,13 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client)
client_data->response_type_support->~ResponseTypeSupport(), rmw_zenoh_cpp::ResponseTypeSupport,
);
allocator->deallocate(client_data->response_type_support, allocator->state);
RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, );

allocator->deallocate(client->data, allocator->state);
// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for
// why we need to do this.
if (!client_data->shutdown_and_query_in_flight()) {
RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, );
allocator->deallocate(client->data, allocator->state);
}

allocator->deallocate(const_cast<char *>(client->service_name), allocator->state);
allocator->deallocate(client, allocator->state);
Expand Down Expand Up @@ -2407,6 +2410,10 @@ rmw_send_request(
"Unable to retrieve client_data from client.",
RMW_RET_INVALID_ARGUMENT);

if (client_data->is_shutdown()) {
return RMW_RET_ERROR;
}

rmw_context_impl_s * context_impl = static_cast<rmw_context_impl_s *>(
client_data->context->impl);

Expand Down Expand Up @@ -2461,6 +2468,10 @@ rmw_send_request(
z_bytes_map_drop(z_move(map));
});

// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for
// why we need to do this.
client_data->increment_in_flight_callbacks();

opts.attachment = z_bytes_map_as_attachment(&map);

opts.target = Z_QUERY_TARGET_ALL_COMPLETE;
Expand All @@ -2474,11 +2485,13 @@ rmw_send_request(
// and any number.
opts.consolidation = z_query_consolidation_latest();
opts.value.payload = z_bytes_t{data_length, reinterpret_cast<const uint8_t *>(request_bytes)};
client_data->zn_closure_reply =
z_closure(rmw_zenoh_cpp::client_data_handler, nullptr, client_data);
z_owned_closure_reply_t zn_closure_reply =
z_closure(rmw_zenoh_cpp::client_data_handler, rmw_zenoh_cpp::client_data_drop, client_data);
z_get(
z_loan(context_impl->session), z_loan(
client_data->keyexpr), "", &client_data->zn_closure_reply, &opts);
z_loan(context_impl->session),
z_loan(client_data->keyexpr), "",
z_move(zn_closure_reply),
&opts);

return RMW_RET_OK;
}
Expand Down

0 comments on commit 5c09185

Please sign in to comment.