Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix replies coming in after a client is destroyed. #228

Merged
merged 3 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "rcpputils/scope_exit.hpp"

#include "rmw/error_handling.h"
#include "rmw/impl/cpp/macros.hpp"

#include "attachment_helpers.hpp"
#include "rmw_data_types.hpp"
Expand Down Expand Up @@ -387,6 +388,42 @@ std::unique_ptr<ZenohReply> rmw_client_data_t::pop_next_reply()
return latest_reply;
}

//==============================================================================
// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class
// for the use of this method.
void rmw_client_data_t::increment_in_flight_callbacks()
{
std::lock_guard<std::mutex> lock(in_flight_mutex_);
num_in_flight_++;
}

//==============================================================================
// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class
// for the use of this method.
bool rmw_client_data_t::shutdown_and_query_in_flight()
{
std::lock_guard<std::mutex> lock(in_flight_mutex_);
is_shutdown_ = true;

return num_in_flight_ > 0;
}

//==============================================================================
// See the comment about the "num_in_flight" class variable in the rmw_client_data_t structure
// for the use of this method.
bool rmw_client_data_t::decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight)
{
std::lock_guard<std::mutex> lock(in_flight_mutex_);
queries_in_flight = --num_in_flight_ > 0;
return is_shutdown_;
}

bool rmw_client_data_t::is_shutdown() const
{
std::lock_guard<std::mutex> lock(in_flight_mutex_);
return is_shutdown_;
}

//==============================================================================
void sub_data_handler(
const z_sample_t * sample,
Expand Down Expand Up @@ -525,6 +562,13 @@ void client_data_handler(z_owned_reply_t * reply, void * data)
);
return;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This decrement should be done in the drop function of the z_owned_closure_reply_t. There is no guarantee that the call function will be called only once per query, while drop will be called once per query at the end.

// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for
// why we need to do this.
if (client_data->is_shutdown()) {
return;
}

if (!z_reply_check(reply)) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
Expand All @@ -550,4 +594,30 @@ void client_data_handler(z_owned_reply_t * reply, void * data)
// Since we took ownership of the reply, null it out here
*reply = z_reply_null();
}

void client_data_drop(void * data)
{
auto client_data = static_cast<rmw_client_data_t *>(data);
if (client_data == nullptr) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to obtain client_data_t "
);
return;
}

// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for
// why we need to do this.
bool queries_in_flight = false;
bool is_shutdown = client_data->decrement_queries_in_flight_and_is_shutdown(queries_in_flight);

if (is_shutdown) {
if (!queries_in_flight) {
RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, );
client_data->context->options.allocator.deallocate(
client_data, client_data->context->options.allocator.state);
}
}
}

} // namespace rmw_zenoh_cpp
35 changes: 34 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ void service_data_handler(const z_query_t * query, void * service_data);

///=============================================================================
void client_data_handler(z_owned_reply_t * reply, void * client_data);
void client_data_drop(void * data);

///=============================================================================
class ZenohQuery final
Expand Down Expand Up @@ -297,7 +298,6 @@ class rmw_client_data_t final
std::shared_ptr<liveliness::Entity> entity;

z_owned_keyexpr_t keyexpr;
z_owned_closure_reply_t zn_closure_reply;

// Store the actual QoS profile used to configure this client.
// The QoS is reused for sending requests and getting responses.
Expand Down Expand Up @@ -329,6 +329,17 @@ class rmw_client_data_t final

DataCallbackManager data_callback_mgr;

// See the comment for "num_in_flight" below on the use of this method.
void increment_in_flight_callbacks();

// See the comment for "num_in_flight" below on the use of this method.
bool shutdown_and_query_in_flight();

// See the comment for "num_in_flight" below on the use of this method.
bool decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight);

bool is_shutdown() const;

private:
void notify();

Expand All @@ -340,6 +351,28 @@ class rmw_client_data_t final

std::deque<std::unique_ptr<rmw_zenoh_cpp::ZenohReply>> reply_queue_;
mutable std::mutex reply_queue_mutex_;

// rmw_zenoh uses Zenoh queries to implement clients. It turns out that in Zenoh, there is no
// way to cancel a query once it is in-flight via the z_get() zenoh-c API. Thus, if an
// rmw_zenoh_cpp user does rmw_create_client(), rmw_send_request(), rmw_destroy_client(), but the
// query comes in after the rmw_destroy_client(), rmw_zenoh_cpp could access already-freed memory.
//
// The next 3 variables are used to avoid that situation. Any time a query is initiated via
// rmw_send_request(), num_in_flight_ is incremented. When the Zenoh calls the callback for the
// query reply, num_in_flight_ is decremented. When rmw_destroy_client() is called, is_shutdown_
// is set to true. If num_in_flight_ is 0, the data associated with this structure is freed.
// If num_in_flight_ is *not* 0, then the data associated with this structure is maintained.
// In the situation where is_shutdown_ is true, and num_in_flight_ drops to 0 in the query
// callback, the query callback will free up the structure.
//
// There is one case which is not handled by this, which has to do with timeouts. The query
// timeout is currently set to essentially infinite. Thus, if a query is in-flight but never
// returns, the memory in this structure will never be freed. There isn't much we can do about
// that at this time, but we may want to consider changing the timeout so that the memory can
// eventually be freed up.
mutable std::mutex in_flight_mutex_;
bool is_shutdown_{false};
size_t num_in_flight_{0};
};
} // namespace rmw_zenoh_cpp

Expand Down
27 changes: 20 additions & 7 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2378,7 +2378,6 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client)
return RMW_RET_INVALID_ARGUMENT);

// CLEANUP ===================================================================
z_drop(z_move(client_data->zn_closure_reply));
z_drop(z_move(client_data->keyexpr));
zc_liveliness_undeclare_token(z_move(client_data->token));

Expand All @@ -2390,9 +2389,13 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client)
client_data->response_type_support->~ResponseTypeSupport(), rmw_zenoh_cpp::ResponseTypeSupport,
);
allocator->deallocate(client_data->response_type_support, allocator->state);
RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, );

allocator->deallocate(client->data, allocator->state);
// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for
// why we need to do this.
if (!client_data->shutdown_and_query_in_flight()) {
RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, );
allocator->deallocate(client->data, allocator->state);
}

allocator->deallocate(const_cast<char *>(client->service_name), allocator->state);
allocator->deallocate(client, allocator->state);
Expand Down Expand Up @@ -2425,6 +2428,10 @@ rmw_send_request(
"Unable to retrieve client_data from client.",
RMW_RET_INVALID_ARGUMENT);

if (client_data->is_shutdown()) {
return RMW_RET_ERROR;
}

rmw_context_impl_s * context_impl = static_cast<rmw_context_impl_s *>(
client_data->context->impl);

Expand Down Expand Up @@ -2479,6 +2486,10 @@ rmw_send_request(
z_bytes_map_drop(z_move(map));
});

// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for
// why we need to do this.
client_data->increment_in_flight_callbacks();

opts.attachment = z_bytes_map_as_attachment(&map);

opts.target = Z_QUERY_TARGET_ALL_COMPLETE;
Expand All @@ -2492,11 +2503,13 @@ rmw_send_request(
// and any number.
opts.consolidation = z_query_consolidation_latest();
opts.value.payload = z_bytes_t{data_length, reinterpret_cast<const uint8_t *>(request_bytes)};
client_data->zn_closure_reply =
z_closure(rmw_zenoh_cpp::client_data_handler, nullptr, client_data);
z_owned_closure_reply_t zn_closure_reply =

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we check that is_shutdown_ is false before issuing new queries for this client ? (Maybe there is another check somewhere else that already prevents that I'm not aware of)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good point. I'll add this in as well.

z_closure(rmw_zenoh_cpp::client_data_handler, rmw_zenoh_cpp::client_data_drop, client_data);
z_get(
z_loan(context_impl->session), z_loan(
client_data->keyexpr), "", &client_data->zn_closure_reply, &opts);
z_loan(context_impl->session),
z_loan(client_data->keyexpr), "",
z_move(zn_closure_reply),
Copy link
Member

@Yadunund Yadunund Jul 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if it makes sense for the rmw_client to own the callback. What happens when the callback does not get triggered yet (the service is busy), but then the client is destroyed?

Would it be better to keep the original implementation but ensure we free the callback with z_reply_null when we call rmw_client_fini?

According to the z_get documentation:

callback – The callback function that will be called on reception of replies for this query. Note that the reply parameter of the callback is passed by mutable reference, but will be dropped once your callback exits to help you avoid memory leaks. If you’d rather take ownership, please refer to the documentation of z_reply_null()

Will closing the Zenoh session ensure all callbacks are freed?
cc: @JEnoch

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if it makes sense for the rmw_client to own the callback. What happens when the callback does not get triggered yet (the service is busy), but then the client is destroyed?

Right, so that is a good question. And the answer is that it is up to rmw_zenoh_cpp to deal with this situation, by not freeing the underlying memory until all in-flight callbacks are accounted for. I'll point out that this is already a problem in rmw_zenoh_cpp; if a callback for a query comes in after rmw_destroy_client has been called, we'll access freed memory. I have an upcoming patch which fixes that, which is built on top of this. If you think it is better, I can combine that into this PR.

Would it be better to keep the original implementation but ensure we free the callback with z_reply_null when we call rmw_client_fini?

I'm not 100% sure, but I believe that this is a different thing. This PR is concerned with the ownership of the closure itself. To see what I am talking about, consider that in almost all other places where we call into the Zenoh API, we use either z_loan or z_move. The code before this PR was using a raw pointer reference, which is a code smell.

What you are talking about is the ownership of the reply. There, we are already taking ownership of the memory during the callback (via z_reply_null), so I don't think we need to make a change. Does that make sense?

Also, it might be worthwhile to get feedback from @JEnoch and @OlivierHecart, since they were the ones that suggested this to me to begin with.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying. I think it would be good to combine those changes in this PR as well.

What you are talking about is the ownership of the reply.

Ah yeah sorry I got things mixed up but my original concern still stands on whether the callback gets freed if the client is destroyed before we receive a reply.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah sorry I got things mixed up but my original concern still stands on whether the callback gets freed if the client is destroyed before we receive a reply.

This is a question for the Zenoh developers, but I believe not. That's the reason for doing the z_move, so that zenoh-c/Zenoh proper is responsible for the lifetime of the closure. I'll also point out that in my tests with valgrind, this didn't show a problem.

&opts);

return RMW_RET_OK;
}
Expand Down