Skip to content

Commit

Permalink
Redo the per-client-data in-flight monitoring.
Browse files Browse the repository at this point in the history
The previous approach with using a global map of
pointers to shared_ptr does not work for one simple reason;
on shutdown, C++ does not define the order in which statics
are destroyed.  Thus it is possible (and even likely in some
cases) that on Ctrl-C, the global map would be deleted, and then
we would try to access it, leading to crashes.  I was seeing some
of these while running the rcl tests.

Instead, go back to the previous solution where we were storing
num_in_flight completely within the ClientData structure.
This requires doing a bit of gymnastics with the data types,
and reintrodues the possible UB during shutdown if the structure
was destructed but there is still queries in flight.  But it
otherwise fixes all of the tests for me locally.

Signed-off-by: Chris Lalancette <[email protected]>
  • Loading branch information
clalancette committed Nov 13, 2024
1 parent 2976b9c commit bba03a8
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 48 deletions.
103 changes: 64 additions & 39 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@
namespace
{

std::mutex client_data_ptr_to_shared_ptr_map_mutex;
std::unordered_map<const rmw_zenoh_cpp::ClientData *,
std::shared_ptr<rmw_zenoh_cpp::ClientData>> client_data_ptr_to_shared_ptr_map;

///=============================================================================
void client_data_handler(z_owned_reply_t * reply, void * data)
{
Expand All @@ -59,16 +55,7 @@ void client_data_handler(z_owned_reply_t * reply, void * data)
return;
}

std::shared_ptr<rmw_zenoh_cpp::ClientData> client_data_shared_ptr{nullptr};
{
std::lock_guard<std::mutex> lk(client_data_ptr_to_shared_ptr_map_mutex);
if (client_data_ptr_to_shared_ptr_map.count(client_data) == 0) {
return;
}
client_data_shared_ptr = client_data_ptr_to_shared_ptr_map[client_data];
}

if (client_data_shared_ptr->is_shutdown()) {
if (client_data->is_shutdown()) {
return;
}

Expand All @@ -94,13 +81,28 @@ void client_data_handler(z_owned_reply_t * reply, void * data)
std::chrono::nanoseconds::rep received_timestamp =
std::chrono::system_clock::now().time_since_epoch().count();

client_data_shared_ptr->add_new_reply(
client_data->add_new_reply(
std::make_unique<rmw_zenoh_cpp::ZenohReply>(reply, received_timestamp));

// Since we took ownership of the reply, null it out here
*reply = z_reply_null();
}

///=============================================================================
void client_data_drop(void * data)
{
auto client_data = static_cast<rmw_zenoh_cpp::ClientData *>(data);
if (client_data == nullptr) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to obtain client_data_t "
);
return;
}

client_data->decrement_in_flight_and_conditionally_remove();
}

} // namespace

namespace rmw_zenoh_cpp
Expand All @@ -109,6 +111,7 @@ namespace rmw_zenoh_cpp
std::shared_ptr<ClientData> ClientData::make(
z_session_t session,
const rmw_node_t * const node,
const rmw_client_t * client,
liveliness::NodeInfo node_info,
std::size_t node_id,
std::size_t service_id,
Expand Down Expand Up @@ -191,6 +194,7 @@ std::shared_ptr<ClientData> ClientData::make(
std::shared_ptr<ClientData> client_data = std::shared_ptr<ClientData>(
new ClientData{
node,
client,
entity,
request_members,
response_members,
Expand All @@ -203,29 +207,29 @@ std::shared_ptr<ClientData> ClientData::make(
return nullptr;
}

std::lock_guard<std::mutex> lk(client_data_ptr_to_shared_ptr_map_mutex);
client_data_ptr_to_shared_ptr_map.emplace(client_data.get(), client_data);

return client_data;
}

///=============================================================================
ClientData::ClientData(
const rmw_node_t * rmw_node,
const rmw_client_t * rmw_client,
std::shared_ptr<liveliness::Entity> entity,
const void * request_type_support_impl,
const void * response_type_support_impl,
std::shared_ptr<RequestTypeSupport> request_type_support,
std::shared_ptr<ResponseTypeSupport> response_type_support)
: rmw_node_(rmw_node),
rmw_client_(rmw_client),
entity_(std::move(entity)),
request_type_support_impl_(request_type_support_impl),
response_type_support_impl_(response_type_support_impl),
request_type_support_(request_type_support),
response_type_support_(response_type_support),
wait_set_data_(nullptr),
sequence_number_(1),
is_shutdown_(false)
is_shutdown_(false),
num_in_flight_(0)
{
// Do nothing.
}
Expand Down Expand Up @@ -269,28 +273,28 @@ bool ClientData::init(z_session_t session)
///=============================================================================
liveliness::TopicInfo ClientData::topic_info() const
{
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> lock(mutex_);
return entity_->topic_info().value();
}

///=============================================================================
bool ClientData::liveliness_is_valid() const
{
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> lock(mutex_);
return zc_liveliness_token_check(&token_);
}

///=============================================================================
void ClientData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const
{
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> lock(mutex_);
entity_->copy_gid(out_gid);
}

///=============================================================================
void ClientData::add_new_reply(std::unique_ptr<ZenohReply> reply)
{
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> lock(mutex_);
const rmw_qos_profile_t adapted_qos_profile =
entity_->topic_info().value().qos_;
if (adapted_qos_profile.history != RMW_QOS_POLICY_HISTORY_KEEP_ALL &&
Expand Down Expand Up @@ -324,7 +328,7 @@ rmw_ret_t ClientData::take_response(
void * ros_response,
bool * taken)
{
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> lock(mutex_);
*taken = false;

if (is_shutdown_ || reply_queue_.empty()) {
Expand Down Expand Up @@ -389,7 +393,7 @@ rmw_ret_t ClientData::send_request(
const void * ros_request,
int64_t * sequence_id)
{
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (is_shutdown_) {
return RMW_RET_OK;
}
Expand Down Expand Up @@ -468,7 +472,7 @@ rmw_ret_t ClientData::send_request(
// TODO(Yadunund): Once we switch to zenoh-cpp with lambda closures,
// capture shared_from_this() instead of this.
z_owned_closure_reply_t zn_closure_reply =
z_closure(client_data_handler, nullptr, this);
z_closure(client_data_handler, client_data_drop, this);
z_get(
context_impl->session(),
z_loan(keyexpr_), "",
Expand All @@ -481,11 +485,6 @@ rmw_ret_t ClientData::send_request(
///=============================================================================
ClientData::~ClientData()
{
{
std::lock_guard<std::mutex> lk(client_data_ptr_to_shared_ptr_map_mutex);
client_data_ptr_to_shared_ptr_map.erase(this);
}

const rmw_ret_t ret = this->shutdown();
if (ret != RMW_RET_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
Expand All @@ -501,15 +500,15 @@ void ClientData::set_on_new_response_callback(
rmw_event_callback_t callback,
const void * user_data)
{
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> lock(mutex_);
data_callback_mgr_.set_callback(user_data, std::move(callback));
}

///=============================================================================
bool ClientData::queue_has_data_and_attach_condition_if_not(
rmw_wait_set_data_t * wait_set_data)
{
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (!reply_queue_.empty()) {
return true;
}
Expand All @@ -521,19 +520,17 @@ bool ClientData::queue_has_data_and_attach_condition_if_not(
///=============================================================================
bool ClientData::detach_condition_and_queue_is_empty()
{
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> lock(mutex_);
wait_set_data_ = nullptr;

return reply_queue_.empty();
}

///=============================================================================
rmw_ret_t ClientData::shutdown()
void ClientData::_shutdown()
{
rmw_ret_t ret = RMW_RET_OK;
std::lock_guard<std::mutex> lock(mutex_);
if (is_shutdown_) {
return ret;
return;
}

// Unregister this node from the ROS graph.
Expand All @@ -545,13 +542,41 @@ rmw_ret_t ClientData::shutdown()
}

is_shutdown_ = true;
}

///=============================================================================
rmw_ret_t ClientData::shutdown()
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
_shutdown();
return RMW_RET_OK;
}

///=============================================================================
bool ClientData::shutdown_and_query_in_flight()
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
_shutdown();
return num_in_flight_ > 0;
}

///=============================================================================
void ClientData::decrement_in_flight_and_conditionally_remove()
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
--num_in_flight_;

if (is_shutdown_ && num_in_flight_ == 0) {
rmw_context_impl_s * context_impl = static_cast<rmw_context_impl_s *>(rmw_node_->data);
std::shared_ptr<rmw_zenoh_cpp::NodeData> node_data = context_impl->get_node_data(rmw_node_);
node_data->delete_client_data(rmw_client_);
}
}

///=============================================================================
bool ClientData::is_shutdown() const
{
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> lock(mutex_);
return is_shutdown_;
}
} // namespace rmw_zenoh_cpp
22 changes: 17 additions & 5 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
static std::shared_ptr<ClientData> make(
z_session_t session,
const rmw_node_t * const node,
const rmw_client_t * client,
liveliness::NodeInfo node_info,
std::size_t node_id,
std::size_t service_id,
Expand Down Expand Up @@ -79,22 +80,27 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
const void * ros_request,
int64_t * sequence_id);

// Set a callback to be called when events happen.
void set_on_new_response_callback(
rmw_event_callback_t callback,
const void * user_data);

// rmw_wait helpers.
// Check if there is data in the queue, and if not attach the wait set condition variable.
bool queue_has_data_and_attach_condition_if_not(
rmw_wait_set_data_t * wait_set_data);

// Detach any attached wait set condition variable, and return whether there is data in the queue.
bool detach_condition_and_queue_is_empty();

// See the comment for "num_in_flight" below on the use of this method.
void decrement_queries_in_flight();

// Shutdown this ClientData.
rmw_ret_t shutdown();

// Shutdown this ClientData, and return whether there are any requests currently in flight.
bool shutdown_and_query_in_flight();

// Decrement the in flight requests, and if that drops to 0 remove the client from the node.
void decrement_in_flight_and_conditionally_remove();

// Check if this ClientData is shutdown.
bool is_shutdown() const;

Expand All @@ -105,6 +111,7 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
// Constructor.
ClientData(
const rmw_node_t * rmw_node,
const rmw_client_t * client,
std::shared_ptr<liveliness::Entity> entity,
const void * request_type_support_impl,
const void * response_type_support_impl,
Expand All @@ -114,10 +121,14 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
// Initialize the Zenoh objects for this entity.
bool init(z_session_t session);

// Shutdown this client (the mutex is expected to be held by the caller).
void _shutdown();

// Internal mutex.
mutable std::mutex mutex_;
mutable std::recursive_mutex mutex_;
// The parent node.
const rmw_node_t * rmw_node_;
const rmw_client_t * rmw_client_;
// The Entity generated for the service.
std::shared_ptr<liveliness::Entity> entity_;
// An owned keyexpression.
Expand All @@ -139,6 +150,7 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
size_t sequence_number_;
// Shutdown flag.
bool is_shutdown_;
size_t num_in_flight_;
};
using ClientDataPtr = std::shared_ptr<ClientData>;
using ClientDataConstPtr = std::shared_ptr<const ClientData>;
Expand Down
8 changes: 4 additions & 4 deletions rmw_zenoh_cpp/src/detail/rmw_node_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ bool NodeData::create_client_data(
auto client_data = ClientData::make(
std::move(session),
node_,
client,
entity_->node_info(),
id_,
std::move(id),
Expand Down Expand Up @@ -387,10 +388,9 @@ void NodeData::delete_client_data(const rmw_client_t * const client)
if (client_it == clients_.end()) {
return;
}
// Shutdown the client, then erase it. The code in rmw_client_data.cpp is careful about keeping
// it alive as long as necessary.
client_it->second->shutdown();
clients_.erase(client);
if (!client_it->second->shutdown_and_query_in_flight()) {
clients_.erase(client);
}
}

///=============================================================================
Expand Down

0 comments on commit bba03a8

Please sign in to comment.