From 536695758b3e6ddbb7ccc341976280367262532c Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Wed, 20 Nov 2024 16:51:39 -0500 Subject: [PATCH 1/3] Don't shutdown contained entities. (#313) It is tempting to think of the entities in the RMW graph as a hierarchy, where an rmw_context_t contains zero or more rmw_node_t, and rmw_node_t contains zero or more publishers, subscriptions, clients, and services. However, the reality is that the upper layers (particularly rclcpp and rclpy) don't exactly view the entities like that. More specifically, each entity is considered standalone, that happens to have linkage to other entities in the graph. For example, a publisher is considered to be a standalone entity that happens to be linked to a particular node. Because of this, it is not proper to shutdown all nodes within a context when the context is shutdown. The node should continue to live on until it is shutdown. And a similar argument goes for the node shutdown; it should not shutdown the publishers, subscriptions, clients, and services that are contained within it. This manifested itself as a exception that was being thrown in some of the tests in test_communication. Because it is using a loop with rclcpp::ok(), followed by a publisher->publish(), the test would sometimes throw an exception if Ctrl-C was hit between the rclcpp::ok() and the publish() call. And that's because even though the context has been shut down, the publisher is an independent entity that should continue to exist until the next rclcpp::ok(). The fix here is simple; don't shut "contained" entities down during a context or node shutdown. Signed-off-by: Chris Lalancette --- .../src/detail/rmw_context_impl_s.cpp | 13 ----- rmw_zenoh_cpp/src/detail/rmw_node_data.cpp | 50 ------------------- 2 files changed, 63 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index 0215a490..50b29038 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -232,19 +232,6 @@ class rmw_context_impl_s::Data final return ret; } - // Shutdown all the nodes in this context. - for (auto node_it = nodes_.begin(); node_it != nodes_.end(); ++node_it) { - ret = node_it->second->shutdown(); - if (ret != RMW_RET_OK) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to shutdown node with id %zu. rmw_ret_t code: %zu.", - node_it->second->id(), - ret - ); - } - } - z_undeclare_subscriber(z_move(graph_subscriber_)); if (shm_manager_.has_value()) { z_drop(z_move(shm_manager_.value())); diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index bd3f3f6e..0fabfd95 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -402,56 +402,6 @@ rmw_ret_t NodeData::shutdown() return ret; } - // Shutdown all the entities within this node. - for (auto pub_it = pubs_.begin(); pub_it != pubs_.end(); ++pub_it) { - ret = pub_it->second->shutdown(); - if (ret != RMW_RET_OK) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to shutdown publisher %s within id %zu. rmw_ret_t code: %zu.", - pub_it->second->topic_info().name_.c_str(), - id_, - ret - ); - } - } - for (auto sub_it = subs_.begin(); sub_it != subs_.end(); ++sub_it) { - ret = sub_it->second->shutdown(); - if (ret != RMW_RET_OK) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to shutdown subscription %s within id %zu. rmw_ret_t code: %zu.", - sub_it->second->topic_info().name_.c_str(), - id_, - ret - ); - } - } - for (auto srv_it = services_.begin(); srv_it != services_.end(); ++srv_it) { - ret = srv_it->second->shutdown(); - if (ret != RMW_RET_OK) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to shutdown service %s within id %zu. rmw_ret_t code: %zu.", - srv_it->second->topic_info().name_.c_str(), - id_, - ret - ); - } - } - for (auto cli_it = clients_.begin(); cli_it != clients_.end(); ++cli_it) { - ret = cli_it->second->shutdown(); - if (ret != RMW_RET_OK) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to shutdown client %s within id %zu. rmw_ret_t code: %zu.", - cli_it->second->topic_info().name_.c_str(), - id_, - ret - ); - } - } - // Unregister this node from the ROS graph. zc_liveliness_undeclare_token(z_move(token_)); From d0447d3395a0784d6e8293a02f5b4d8055b2f9fc Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Wed, 20 Nov 2024 17:44:38 -0500 Subject: [PATCH 2/3] Fix UB in ClientData stuff. (#320) The num_in_flight stuff was *still* wrong here. First of all, we forgot to increment num_in_flight when actually kicking off a new query. Once we did that, we had to change the lock in NodeData to a recursive one, since the call to delete_client_data from ClientData could be called recursively. And then finally we had to drop the ClientData lock before the delete_client_data, since we are about to delete ourselves and the unlock would have been UB. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/rmw_client_data.cpp | 5 +++- rmw_zenoh_cpp/src/detail/rmw_node_data.cpp | 30 ++++++++++---------- rmw_zenoh_cpp/src/detail/rmw_node_data.hpp | 2 +- 3 files changed, 20 insertions(+), 17 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp index cf0fcb4e..7bfe4c16 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -471,6 +471,7 @@ rmw_ret_t ClientData::send_request( opts.value.payload = z_bytes_t{data_length, reinterpret_cast(request_bytes)}; // TODO(Yadunund): Once we switch to zenoh-cpp with lambda closures, // capture shared_from_this() instead of this. + num_in_flight_++; z_owned_closure_reply_t zn_closure_reply = z_closure(client_data_handler, client_data_drop, this); z_get( @@ -563,7 +564,7 @@ bool ClientData::shutdown_and_query_in_flight() ///============================================================================= void ClientData::decrement_in_flight_and_conditionally_remove() { - std::lock_guard lock(mutex_); + std::unique_lock lock(mutex_); --num_in_flight_; if (is_shutdown_ && num_in_flight_ == 0) { @@ -575,6 +576,8 @@ void ClientData::decrement_in_flight_and_conditionally_remove() if (node_data == nullptr) { return; } + // We have to unlock here since we are about to delete ourself, and thus the unlock would be UB. + lock.unlock(); node_data->delete_client_data(rmw_client_); } } diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index 0fabfd95..56e88143 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -115,7 +115,7 @@ NodeData::~NodeData() ///============================================================================= std::size_t NodeData::id() const { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); return id_; } @@ -128,7 +128,7 @@ bool NodeData::create_pub_data( const rosidl_message_type_support_t * type_support, const rmw_qos_profile_t * qos_profile) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); if (is_shutdown_) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -169,7 +169,7 @@ bool NodeData::create_pub_data( ///============================================================================= PublisherDataPtr NodeData::get_pub_data(const rmw_publisher_t * const publisher) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); auto it = pubs_.find(publisher); if (it == pubs_.end()) { return nullptr; @@ -181,7 +181,7 @@ PublisherDataPtr NodeData::get_pub_data(const rmw_publisher_t * const publisher) ///============================================================================= void NodeData::delete_pub_data(const rmw_publisher_t * const publisher) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); pubs_.erase(publisher); } @@ -195,7 +195,7 @@ bool NodeData::create_sub_data( const rosidl_message_type_support_t * type_support, const rmw_qos_profile_t * qos_profile) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); if (is_shutdown_) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -237,7 +237,7 @@ bool NodeData::create_sub_data( ///============================================================================= SubscriptionDataPtr NodeData::get_sub_data(const rmw_subscription_t * const subscription) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); auto it = subs_.find(subscription); if (it == subs_.end()) { return nullptr; @@ -249,7 +249,7 @@ SubscriptionDataPtr NodeData::get_sub_data(const rmw_subscription_t * const subs ///============================================================================= void NodeData::delete_sub_data(const rmw_subscription_t * const subscription) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); subs_.erase(subscription); } @@ -262,7 +262,7 @@ bool NodeData::create_service_data( const rosidl_service_type_support_t * type_supports, const rmw_qos_profile_t * qos_profile) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); if (is_shutdown_) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -303,7 +303,7 @@ bool NodeData::create_service_data( ///============================================================================= ServiceDataPtr NodeData::get_service_data(const rmw_service_t * const service) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); auto it = services_.find(service); if (it == services_.end()) { return nullptr; @@ -315,7 +315,7 @@ ServiceDataPtr NodeData::get_service_data(const rmw_service_t * const service) ///============================================================================= void NodeData::delete_service_data(const rmw_service_t * const service) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); services_.erase(service); } @@ -329,7 +329,7 @@ bool NodeData::create_client_data( const rosidl_service_type_support_t * type_supports, const rmw_qos_profile_t * qos_profile) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); if (is_shutdown_) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -371,7 +371,7 @@ bool NodeData::create_client_data( ///============================================================================= ClientDataPtr NodeData::get_client_data(const rmw_client_t * const client) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); auto it = clients_.find(client); if (it == clients_.end()) { return nullptr; @@ -383,7 +383,7 @@ ClientDataPtr NodeData::get_client_data(const rmw_client_t * const client) ///============================================================================= void NodeData::delete_client_data(const rmw_client_t * const client) { - std::lock_guard lock_guard(mutex_); + std::lock_guard lock_guard(mutex_); auto client_it = clients_.find(client); if (client_it == clients_.end()) { return; @@ -396,7 +396,7 @@ void NodeData::delete_client_data(const rmw_client_t * const client) ///============================================================================= rmw_ret_t NodeData::shutdown() { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); rmw_ret_t ret = RMW_RET_OK; if (is_shutdown_) { return ret; @@ -413,7 +413,7 @@ rmw_ret_t NodeData::shutdown() // Check if the Node is shutdown. bool NodeData::is_shutdown() const { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); return is_shutdown_; } diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp index f85b1366..5489e4b7 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp @@ -130,7 +130,7 @@ class NodeData final std::shared_ptr entity, zc_owned_liveliness_token_t token); // Internal mutex. - mutable std::mutex mutex_; + mutable std::recursive_mutex mutex_; // The rmw_node_t associated with this NodeData. const rmw_node_t * node_; // The entity id of this node as generated by get_next_entity_id(). From 8306a63313cd9e21883344deff9abc782636a464 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Wed, 20 Nov 2024 17:46:09 -0500 Subject: [PATCH 3/3] Fix the query timestamp. (#321) In particular, make it the time we received it in the service_data_handler, not the time it was "taken" (which may be quite a bit later). This aligns it with what the client reply timestamp does. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/rmw_service_data.cpp | 10 ++++++---- rmw_zenoh_cpp/src/detail/zenoh_utils.cpp | 9 ++++++++- rmw_zenoh_cpp/src/detail/zenoh_utils.hpp | 5 ++++- 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp index 9be89277..42366ef0 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp @@ -59,7 +59,10 @@ void service_data_handler(const z_query_t * query, void * data) return; } - service_data->add_new_query(std::make_unique(query)); + std::chrono::nanoseconds::rep received_timestamp = + std::chrono::system_clock::now().time_since_epoch().count(); + + service_data->add_new_query(std::make_unique(query, received_timestamp)); } ///============================================================================= @@ -339,9 +342,8 @@ rmw_ret_t ServiceData::take_request( RMW_SET_ERROR_MSG("Could not get client GID from attachment"); return RMW_RET_ERROR; } - auto now = std::chrono::system_clock::now().time_since_epoch(); - auto now_ns = std::chrono::duration_cast(now); - request_header->received_timestamp = now_ns.count(); + + request_header->received_timestamp = query->get_received_timestamp(); // Add this query to the map, so that rmw_send_response can quickly look it up later. const size_t hash = rmw_zenoh_cpp::hash_gid(request_header->request_id.writer_guid); diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp index 2e6c8e0c..e30e80d0 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp @@ -64,9 +64,16 @@ create_map_and_set_sequence_num( } ///============================================================================= -ZenohQuery::ZenohQuery(const z_query_t * query) +ZenohQuery::ZenohQuery(const z_query_t * query, std::chrono::nanoseconds::rep received_timestamp) { query_ = z_query_clone(query); + received_timestamp_ = received_timestamp; +} + +///============================================================================= +std::chrono::nanoseconds::rep ZenohQuery::get_received_timestamp() const +{ + return received_timestamp_; } ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp index f7ec26b2..a750cbf2 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp @@ -56,14 +56,17 @@ class ZenohReply final class ZenohQuery final { public: - ZenohQuery(const z_query_t * query); + ZenohQuery(const z_query_t * query, std::chrono::nanoseconds::rep received_timestamp); ~ZenohQuery(); const z_query_t get_query() const; + std::chrono::nanoseconds::rep get_received_timestamp() const; + private: z_owned_query_t query_; + std::chrono::nanoseconds::rep received_timestamp_; }; } // namespace rmw_zenoh_cpp