diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp index 84ae0328..195f9ff6 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -43,10 +43,6 @@ namespace { -std::mutex client_data_ptr_to_shared_ptr_map_mutex; -std::unordered_map> client_data_ptr_to_shared_ptr_map; - ///============================================================================= void client_data_handler(z_owned_reply_t * reply, void * data) { @@ -59,16 +55,7 @@ void client_data_handler(z_owned_reply_t * reply, void * data) return; } - std::shared_ptr client_data_shared_ptr{nullptr}; - { - std::lock_guard lk(client_data_ptr_to_shared_ptr_map_mutex); - if (client_data_ptr_to_shared_ptr_map.count(client_data) == 0) { - return; - } - client_data_shared_ptr = client_data_ptr_to_shared_ptr_map[client_data]; - } - - if (client_data_shared_ptr->is_shutdown()) { + if (client_data->is_shutdown()) { return; } @@ -94,13 +81,28 @@ void client_data_handler(z_owned_reply_t * reply, void * data) std::chrono::nanoseconds::rep received_timestamp = std::chrono::system_clock::now().time_since_epoch().count(); - client_data_shared_ptr->add_new_reply( + client_data->add_new_reply( std::make_unique(reply, received_timestamp)); // Since we took ownership of the reply, null it out here *reply = z_reply_null(); } +///============================================================================= +void client_data_drop(void * data) +{ + auto client_data = static_cast(data); + if (client_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain client_data_t " + ); + return; + } + + client_data->decrement_in_flight_and_conditionally_remove(); +} + } // namespace namespace rmw_zenoh_cpp @@ -109,6 +111,7 @@ namespace rmw_zenoh_cpp std::shared_ptr ClientData::make( z_session_t session, const rmw_node_t * const node, + const rmw_client_t * client, liveliness::NodeInfo node_info, std::size_t node_id, std::size_t service_id, @@ -191,6 +194,7 @@ std::shared_ptr ClientData::make( std::shared_ptr client_data = std::shared_ptr( new ClientData{ node, + client, entity, request_members, response_members, @@ -203,21 +207,20 @@ std::shared_ptr ClientData::make( return nullptr; } - std::lock_guard lk(client_data_ptr_to_shared_ptr_map_mutex); - client_data_ptr_to_shared_ptr_map.emplace(client_data.get(), client_data); - return client_data; } ///============================================================================= ClientData::ClientData( const rmw_node_t * rmw_node, + const rmw_client_t * rmw_client, std::shared_ptr entity, const void * request_type_support_impl, const void * response_type_support_impl, std::shared_ptr request_type_support, std::shared_ptr response_type_support) : rmw_node_(rmw_node), + rmw_client_(rmw_client), entity_(std::move(entity)), request_type_support_impl_(request_type_support_impl), response_type_support_impl_(response_type_support_impl), @@ -225,7 +228,8 @@ ClientData::ClientData( response_type_support_(response_type_support), wait_set_data_(nullptr), sequence_number_(1), - is_shutdown_(false) + is_shutdown_(false), + num_in_flight_(0) { // Do nothing. } @@ -269,28 +273,28 @@ bool ClientData::init(z_session_t session) ///============================================================================= liveliness::TopicInfo ClientData::topic_info() const { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); return entity_->topic_info().value(); } ///============================================================================= bool ClientData::liveliness_is_valid() const { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); return zc_liveliness_token_check(&token_); } ///============================================================================= void ClientData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); entity_->copy_gid(out_gid); } ///============================================================================= void ClientData::add_new_reply(std::unique_ptr reply) { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); const rmw_qos_profile_t adapted_qos_profile = entity_->topic_info().value().qos_; if (adapted_qos_profile.history != RMW_QOS_POLICY_HISTORY_KEEP_ALL && @@ -324,7 +328,7 @@ rmw_ret_t ClientData::take_response( void * ros_response, bool * taken) { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); *taken = false; if (is_shutdown_ || reply_queue_.empty()) { @@ -389,7 +393,7 @@ rmw_ret_t ClientData::send_request( const void * ros_request, int64_t * sequence_id) { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); if (is_shutdown_) { return RMW_RET_OK; } @@ -468,7 +472,7 @@ rmw_ret_t ClientData::send_request( // TODO(Yadunund): Once we switch to zenoh-cpp with lambda closures, // capture shared_from_this() instead of this. z_owned_closure_reply_t zn_closure_reply = - z_closure(client_data_handler, nullptr, this); + z_closure(client_data_handler, client_data_drop, this); z_get( context_impl->session(), z_loan(keyexpr_), "", @@ -481,11 +485,6 @@ rmw_ret_t ClientData::send_request( ///============================================================================= ClientData::~ClientData() { - { - std::lock_guard lk(client_data_ptr_to_shared_ptr_map_mutex); - client_data_ptr_to_shared_ptr_map.erase(this); - } - const rmw_ret_t ret = this->shutdown(); if (ret != RMW_RET_OK) { RMW_ZENOH_LOG_ERROR_NAMED( @@ -501,7 +500,7 @@ void ClientData::set_on_new_response_callback( rmw_event_callback_t callback, const void * user_data) { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); data_callback_mgr_.set_callback(user_data, std::move(callback)); } @@ -509,7 +508,7 @@ void ClientData::set_on_new_response_callback( bool ClientData::queue_has_data_and_attach_condition_if_not( rmw_wait_set_data_t * wait_set_data) { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); if (!reply_queue_.empty()) { return true; } @@ -521,19 +520,17 @@ bool ClientData::queue_has_data_and_attach_condition_if_not( ///============================================================================= bool ClientData::detach_condition_and_queue_is_empty() { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); wait_set_data_ = nullptr; return reply_queue_.empty(); } ///============================================================================= -rmw_ret_t ClientData::shutdown() +void ClientData::_shutdown() { - rmw_ret_t ret = RMW_RET_OK; - std::lock_guard lock(mutex_); if (is_shutdown_) { - return ret; + return; } // Unregister this node from the ROS graph. @@ -545,13 +542,41 @@ rmw_ret_t ClientData::shutdown() } is_shutdown_ = true; +} + +///============================================================================= +rmw_ret_t ClientData::shutdown() +{ + std::lock_guard lock(mutex_); + _shutdown(); return RMW_RET_OK; } +///============================================================================= +bool ClientData::shutdown_and_query_in_flight() +{ + std::lock_guard lock(mutex_); + _shutdown(); + return num_in_flight_ > 0; +} + +///============================================================================= +void ClientData::decrement_in_flight_and_conditionally_remove() +{ + std::lock_guard lock(mutex_); + --num_in_flight_; + + if (is_shutdown_ && num_in_flight_ == 0) { + rmw_context_impl_s * context_impl = static_cast(rmw_node_->data); + std::shared_ptr node_data = context_impl->get_node_data(rmw_node_); + node_data->delete_client_data(rmw_client_); + } +} + ///============================================================================= bool ClientData::is_shutdown() const { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); return is_shutdown_; } } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp index cee312b1..eda4bf9c 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp @@ -49,6 +49,7 @@ class ClientData final : public std::enable_shared_from_this static std::shared_ptr make( z_session_t session, const rmw_node_t * const node, + const rmw_client_t * client, liveliness::NodeInfo node_info, std::size_t node_id, std::size_t service_id, @@ -79,22 +80,27 @@ class ClientData final : public std::enable_shared_from_this const void * ros_request, int64_t * sequence_id); + // Set a callback to be called when events happen. void set_on_new_response_callback( rmw_event_callback_t callback, const void * user_data); - // rmw_wait helpers. + // Check if there is data in the queue, and if not attach the wait set condition variable. bool queue_has_data_and_attach_condition_if_not( rmw_wait_set_data_t * wait_set_data); + // Detach any attached wait set condition variable, and return whether there is data in the queue. bool detach_condition_and_queue_is_empty(); - // See the comment for "num_in_flight" below on the use of this method. - void decrement_queries_in_flight(); - // Shutdown this ClientData. rmw_ret_t shutdown(); + // Shutdown this ClientData, and return whether there are any requests currently in flight. + bool shutdown_and_query_in_flight(); + + // Decrement the in flight requests, and if that drops to 0 remove the client from the node. + void decrement_in_flight_and_conditionally_remove(); + // Check if this ClientData is shutdown. bool is_shutdown() const; @@ -105,6 +111,7 @@ class ClientData final : public std::enable_shared_from_this // Constructor. ClientData( const rmw_node_t * rmw_node, + const rmw_client_t * client, std::shared_ptr entity, const void * request_type_support_impl, const void * response_type_support_impl, @@ -114,10 +121,14 @@ class ClientData final : public std::enable_shared_from_this // Initialize the Zenoh objects for this entity. bool init(z_session_t session); + // Shutdown this client (the mutex is expected to be held by the caller). + void _shutdown(); + // Internal mutex. - mutable std::mutex mutex_; + mutable std::recursive_mutex mutex_; // The parent node. const rmw_node_t * rmw_node_; + const rmw_client_t * rmw_client_; // The Entity generated for the service. std::shared_ptr entity_; // An owned keyexpression. @@ -139,6 +150,7 @@ class ClientData final : public std::enable_shared_from_this size_t sequence_number_; // Shutdown flag. bool is_shutdown_; + size_t num_in_flight_; }; using ClientDataPtr = std::shared_ptr; using ClientDataConstPtr = std::shared_ptr; diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index f3430280..bd3f3f6e 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -347,6 +347,7 @@ bool NodeData::create_client_data( auto client_data = ClientData::make( std::move(session), node_, + client, entity_->node_info(), id_, std::move(id), @@ -387,10 +388,9 @@ void NodeData::delete_client_data(const rmw_client_t * const client) if (client_it == clients_.end()) { return; } - // Shutdown the client, then erase it. The code in rmw_client_data.cpp is careful about keeping - // it alive as long as necessary. - client_it->second->shutdown(); - clients_.erase(client); + if (!client_it->second->shutdown_and_query_in_flight()) { + clients_.erase(client); + } } ///=============================================================================