From 8bca9d744c9ca6cd0f56d98406c40acc57fa3558 Mon Sep 17 00:00:00 2001 From: Yuyuan Yuan Date: Wed, 11 Dec 2024 03:18:20 +0800 Subject: [PATCH 1/2] refactor: wrap the zenoh session with a shared pointer (#333) * refactor: wrap the zenoh session with a shared pointer * fixup! refactor: wrap the zenoh session with a shared pointer * chore: remove the automatically added headers * fix: apply the suggested change * fix: hold the session at the right place --- rmw_zenoh_cpp/src/detail/rmw_client_data.cpp | 15 ++++--- rmw_zenoh_cpp/src/detail/rmw_client_data.hpp | 7 ++- .../src/detail/rmw_context_impl_s.cpp | 43 ++++++++++--------- .../src/detail/rmw_context_impl_s.hpp | 4 +- rmw_zenoh_cpp/src/detail/rmw_node_data.cpp | 8 ++-- rmw_zenoh_cpp/src/detail/rmw_node_data.hpp | 8 ++-- .../src/detail/rmw_publisher_data.cpp | 16 ++++--- .../src/detail/rmw_publisher_data.hpp | 6 ++- rmw_zenoh_cpp/src/detail/rmw_service_data.cpp | 12 ++++-- rmw_zenoh_cpp/src/detail/rmw_service_data.hpp | 5 ++- .../src/detail/rmw_subscription_data.cpp | 16 ++++--- .../src/detail/rmw_subscription_data.hpp | 6 ++- rmw_zenoh_cpp/src/detail/zenoh_utils.cpp | 14 ++++++ rmw_zenoh_cpp/src/detail/zenoh_utils.hpp | 15 +++++++ 14 files changed, 120 insertions(+), 55 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp index a95c98a2..f202ad29 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -105,7 +105,7 @@ namespace rmw_zenoh_cpp { ///============================================================================= std::shared_ptr ClientData::make( - const z_loaned_session_t * session, + std::shared_ptr session, const rmw_node_t * const node, const rmw_client_t * client, liveliness::NodeInfo node_info, @@ -167,7 +167,7 @@ std::shared_ptr ClientData::make( std::size_t domain_id = node_info.domain_id_; auto entity = liveliness::Entity::make( - z_info_zid(session), + z_info_zid(session->loan()), std::to_string(node_id), std::to_string(service_id), liveliness::EntityType::Client, @@ -192,6 +192,7 @@ std::shared_ptr ClientData::make( node, client, entity, + session, request_members, response_members, request_type_support, @@ -211,6 +212,7 @@ ClientData::ClientData( const rmw_node_t * rmw_node, const rmw_client_t * rmw_client, std::shared_ptr entity, + std::shared_ptr sess, const void * request_type_support_impl, const void * response_type_support_impl, std::shared_ptr request_type_support, @@ -218,6 +220,7 @@ ClientData::ClientData( : rmw_node_(rmw_node), rmw_client_(rmw_client), entity_(std::move(entity)), + sess_(std::move(sess)), request_type_support_impl_(request_type_support_impl), response_type_support_impl_(response_type_support_impl), request_type_support_(request_type_support), @@ -232,7 +235,7 @@ ClientData::ClientData( } ///============================================================================= -bool ClientData::init(const z_loaned_session_t * session) +bool ClientData::init(std::shared_ptr session) { if (z_keyexpr_from_str( &this->keyexpr_, @@ -250,7 +253,7 @@ bool ClientData::init(const z_loaned_session_t * session) z_view_keyexpr_t liveliness_ke; z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); if (z_liveliness_declare_token( - session, + session->loan(), &this->token_, z_loan(liveliness_ke), NULL @@ -266,6 +269,7 @@ bool ClientData::init(const z_loaned_session_t * session) z_drop(z_move(this->token_)); }); + sess_ = session; initialized_ = true; free_ros_keyexpr.cancel(); @@ -470,7 +474,7 @@ rmw_ret_t ClientData::send_request( z_owned_closure_reply_t zn_closure_reply; z_closure(&zn_closure_reply, client_data_handler, client_data_drop, this); z_get( - context_impl->session(), + sess_->loan(), z_loan(keyexpr_), "", z_move(zn_closure_reply), &opts); @@ -535,6 +539,7 @@ void ClientData::_shutdown() z_drop(z_move(keyexpr_)); } + sess_.reset(); is_shutdown_ = true; } diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp index bc737435..349a26db 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp @@ -47,7 +47,7 @@ class ClientData final : public std::enable_shared_from_this public: // Make a shared_ptr of ClientData. static std::shared_ptr make( - const z_loaned_session_t * session, + std::shared_ptr session, const rmw_node_t * const node, const rmw_client_t * client, liveliness::NodeInfo node_info, @@ -113,13 +113,14 @@ class ClientData final : public std::enable_shared_from_this const rmw_node_t * rmw_node, const rmw_client_t * client, std::shared_ptr entity, + std::shared_ptr sess, const void * request_type_support_impl, const void * response_type_support_impl, std::shared_ptr request_type_support, std::shared_ptr response_type_support); // Initialize the Zenoh objects for this entity. - bool init(const z_loaned_session_t * session); + bool init(std::shared_ptr session); // Shutdown this client (the mutex is expected to be held by the caller). void _shutdown(); @@ -131,6 +132,8 @@ class ClientData final : public std::enable_shared_from_this const rmw_client_t * rmw_client_; // The Entity generated for the service. std::shared_ptr entity_; + // A shared session. + std::shared_ptr sess_; // An owned keyexpression. z_owned_keyexpr_t keyexpr_; // Liveliness token for the service. diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index 9458eb45..944f5ff5 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -36,6 +36,7 @@ #include "rcpputils/scope_exit.hpp" #include "rmw/error_handling.h" +#include "zenoh_utils.hpp" // Megabytes of SHM to reserve. // TODO(clalancette): Make this configurable, or get it from the configuration @@ -86,13 +87,18 @@ class rmw_context_impl_s::Data final }); // Initialize the zenoh session. - if (z_open(&session_, z_move(config), NULL) != Z_OK) { + z_owned_session_t raw_session; + if (z_open(&raw_session, z_move(config), NULL) != Z_OK) { RMW_SET_ERROR_MSG("Error setting up zenoh session."); throw std::runtime_error("Error setting up zenoh session."); } + if (session_ != nullptr) { + session_.reset(); + } + session_ = std::make_shared(raw_session); auto close_session = rcpputils::make_scope_exit( - [this]() { - z_close(z_loan_mut(session_), NULL); + [&raw_session]() { + z_close(z_loan_mut(raw_session), NULL); }); // Verify if the zenoh router is running if configured. @@ -102,7 +108,7 @@ class rmw_context_impl_s::Data final uint64_t connection_attempts = 0; constexpr std::chrono::milliseconds sleep_time(1000); constexpr int64_t ticks_between_print(std::chrono::milliseconds(1000) / sleep_time); - while ((ret = rmw_zenoh_cpp::zenoh_router_check(z_loan(session_))) != RMW_RET_OK) { + while ((ret = rmw_zenoh_cpp::zenoh_router_check(session_->loan())) != RMW_RET_OK) { if ((connection_attempts % ticks_between_print) == 0) { RMW_ZENOH_LOG_WARN_NAMED( "rmw_zenoh_cpp", @@ -117,7 +123,7 @@ class rmw_context_impl_s::Data final } // Initialize the graph cache. - const z_id_t zid = z_info_zid(z_loan(session_)); + const z_id_t zid = z_info_zid(session_->loan()); graph_cache_ = std::make_shared(zid); // Setup liveliness subscriptions for discovery. std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token(domain_id); @@ -144,7 +150,7 @@ class rmw_context_impl_s::Data final z_view_keyexpr_t keyexpr; z_view_keyexpr_from_str(&keyexpr, liveliness_str.c_str()); z_liveliness_get( - z_loan(session_), z_loan(keyexpr), + session_->loan(), z_loan(keyexpr), z_move(closure), NULL); z_owned_reply_t reply; while (z_recv(z_loan(handler), &reply) == Z_OK) { @@ -203,7 +209,7 @@ class rmw_context_impl_s::Data final z_view_keyexpr_t liveliness_ke; z_view_keyexpr_from_str(&liveliness_ke, liveliness_str.c_str()); if (z_liveliness_declare_subscriber( - z_loan(session_), + session_->loan(), &graph_subscriber_, z_loan(liveliness_ke), z_move(callback), &sub_options) != Z_OK) { @@ -240,11 +246,8 @@ class rmw_context_impl_s::Data final // to avoid an AB/BA deadlock if shutdown is racing with graph_sub_data_handler(). } - // Close the zenoh session - if (z_close(z_loan_mut(session_), NULL) != Z_OK) { - RMW_SET_ERROR_MSG("Error while closing zenoh session"); - return RMW_RET_ERROR; - } + // Drop the shared session. + session_.reset(); return RMW_RET_OK; } @@ -255,10 +258,10 @@ class rmw_context_impl_s::Data final return enclave_; } - const z_loaned_session_t * session() const + std::shared_ptr session() const { std::lock_guard lock(mutex_); - return z_loan(session_); + return session_; } std::optional & shm_provider() @@ -288,7 +291,7 @@ class rmw_context_impl_s::Data final bool session_is_valid() const { std::lock_guard lock(mutex_); - return !z_session_is_closed(z_loan(session_)); + return !z_session_is_closed(session_->loan()); } std::shared_ptr graph_cache() @@ -309,7 +312,7 @@ class rmw_context_impl_s::Data final } // Check that the Zenoh session is still valid. - if (z_session_is_closed(z_loan(session_))) { + if (z_session_is_closed(session_->loan())) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to create NodeData as Zenoh session is invalid."); @@ -319,7 +322,7 @@ class rmw_context_impl_s::Data final auto node_data = rmw_zenoh_cpp::NodeData::make( node, this->get_next_entity_id(), - z_loan(session_), + session_->loan(), domain_id_, ns, node_name, @@ -395,8 +398,8 @@ class rmw_context_impl_s::Data final std::size_t domain_id_; // Enclave, name used to find security artifacts in a sros2 keystore. std::string enclave_; - // An owned session. - z_owned_session_t session_; + // A shared session. + std::shared_ptr session_{nullptr}; // An optional SHM manager that is initialized of SHM is enabled in the // zenoh session config. std::optional shm_provider_; @@ -472,7 +475,7 @@ std::string rmw_context_impl_s::enclave() const } ///============================================================================= -const z_loaned_session_t * rmw_context_impl_s::session() const +std::shared_ptr rmw_context_impl_s::session() const { return data_->session(); } diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index 1e46d6af..ea66e0c3 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -47,8 +47,8 @@ class rmw_context_impl_s final // Get a copy of the enclave. std::string enclave() const; - // Loan the Zenoh session. - const z_loaned_session_t * session() const; + // Share the Zenoh session. + std::shared_ptr session() const; // Get a reference to the shm_provider. // Note: This is not thread-safe. diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index cb4817a5..1255de21 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -121,7 +121,7 @@ std::size_t NodeData::id() const ///============================================================================= bool NodeData::create_pub_data( const rmw_publisher_t * const publisher, - const z_loaned_session_t * session, + std::shared_ptr session, std::size_t id, const std::string & topic_name, const rosidl_message_type_support_t * type_support, @@ -187,7 +187,7 @@ void NodeData::delete_pub_data(const rmw_publisher_t * const publisher) ///============================================================================= bool NodeData::create_sub_data( const rmw_subscription_t * const subscription, - const z_loaned_session_t * session, + std::shared_ptr session, std::shared_ptr graph_cache, std::size_t id, const std::string & topic_name, @@ -255,7 +255,7 @@ void NodeData::delete_sub_data(const rmw_subscription_t * const subscription) ///============================================================================= bool NodeData::create_service_data( const rmw_service_t * const service, - const z_loaned_session_t * session, + std::shared_ptr session, std::size_t id, const std::string & service_name, const rosidl_service_type_support_t * type_supports, @@ -322,7 +322,7 @@ void NodeData::delete_service_data(const rmw_service_t * const service) ///============================================================================= bool NodeData::create_client_data( const rmw_client_t * const client, - const z_loaned_session_t * session, + std::shared_ptr session, std::size_t id, const std::string & service_name, const rosidl_service_type_support_t * type_supports, diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp index ab17e096..4429b589 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp @@ -55,7 +55,7 @@ class NodeData final // Create a new PublisherData for a given rmw_publisher_t. bool create_pub_data( const rmw_publisher_t * const publisher, - const z_loaned_session_t * session, + std::shared_ptr sess, std::size_t id, const std::string & topic_name, const rosidl_message_type_support_t * type_support, @@ -70,7 +70,7 @@ class NodeData final // Create a new SubscriptionData for a given rmw_subscription_t. bool create_sub_data( const rmw_subscription_t * const subscription, - const z_loaned_session_t * session, + std::shared_ptr sess, std::shared_ptr graph_cache, std::size_t id, const std::string & topic_name, @@ -86,7 +86,7 @@ class NodeData final // Create a new ServiceData for a given rmw_service_t. bool create_service_data( const rmw_service_t * const service, - const z_loaned_session_t * session, + std::shared_ptr session, std::size_t id, const std::string & service_name, const rosidl_service_type_support_t * type_support, @@ -101,7 +101,7 @@ class NodeData final // Create a new ClientData for a given rmw_client_t. bool create_client_data( const rmw_client_t * const client, - const z_loaned_session_t * session, + std::shared_ptr session, std::size_t id, const std::string & service_name, const rosidl_service_type_support_t * type_support, diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index d1449b63..f54c50ab 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -42,7 +42,7 @@ namespace rmw_zenoh_cpp ///============================================================================= std::shared_ptr PublisherData::make( - const z_loaned_session_t * session, + std::shared_ptr session, const rmw_node_t * const node, liveliness::NodeInfo node_info, std::size_t node_id, @@ -82,7 +82,7 @@ std::shared_ptr PublisherData::make( std::size_t domain_id = node_info.domain_id_; auto entity = liveliness::Entity::make( - z_info_zid(session), + z_info_zid(session->loan()), std::to_string(node_id), std::to_string(publisher_id), liveliness::EntityType::Publisher, @@ -129,7 +129,7 @@ std::shared_ptr PublisherData::make( ze_owned_publication_cache_t pub_cache_; if (ze_declare_publication_cache( - session, &pub_cache_, z_loan(pub_ke), &pub_cache_opts)) + session->loan(), &pub_cache_, z_loan(pub_ke), &pub_cache_opts)) { RMW_SET_ERROR_MSG("unable to create zenoh publisher cache"); return nullptr; @@ -158,7 +158,7 @@ std::shared_ptr PublisherData::make( z_owned_publisher_t pub; // TODO(clalancette): What happens if the key name is a valid but empty string? if (z_declare_publisher( - session, &pub, z_loan(pub_ke), &opts) != Z_OK) + session->loan(), &pub, z_loan(pub_ke), &opts) != Z_OK) { RMW_SET_ERROR_MSG("Unable to create Zenoh publisher."); return nullptr; @@ -173,7 +173,7 @@ std::shared_ptr PublisherData::make( z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); z_owned_liveliness_token_t token; if (z_liveliness_declare_token( - session, &token, z_loan(liveliness_ke), + session->loan(), &token, z_loan(liveliness_ke), NULL) != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( @@ -194,6 +194,7 @@ std::shared_ptr PublisherData::make( new PublisherData{ node, std::move(entity), + std::move(session), std::move(pub), std::move(pub_cache), std::move(token), @@ -206,6 +207,7 @@ std::shared_ptr PublisherData::make( PublisherData::PublisherData( const rmw_node_t * rmw_node, std::shared_ptr entity, + std::shared_ptr sess, z_owned_publisher_t pub, std::optional pub_cache, z_owned_liveliness_token_t token, @@ -213,6 +215,7 @@ PublisherData::PublisherData( std::unique_ptr type_support) : rmw_node_(rmw_node), entity_(std::move(entity)), + sess_(std::move(sess)), pub_(std::move(pub)), pub_cache_(std::move(pub_cache)), token_(std::move(token)), @@ -439,10 +442,11 @@ rmw_ret_t PublisherData::shutdown() // Unregister this publisher from the ROS graph. z_liveliness_undeclare_token(z_move(token_)); if (pub_cache_.has_value()) { - z_drop(z_move(pub_cache_.value())); + ze_undeclare_publication_cache(z_move(pub_cache_.value())); } z_undeclare_publisher(z_move(pub_)); + sess_.reset(); is_shutdown_ = true; return RMW_RET_OK; } diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp index bde0c13b..1853ff8a 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp @@ -28,6 +28,7 @@ #include "liveliness_utils.hpp" #include "message_type_support.hpp" #include "type_support_common.hpp" +#include "zenoh_utils.hpp" #include "rcutils/allocator.h" @@ -42,7 +43,7 @@ class PublisherData final public: // Make a shared_ptr of PublisherData. static std::shared_ptr make( - const z_loaned_session_t * session, + std::shared_ptr session, const rmw_node_t * const node, liveliness::NodeInfo node_info, std::size_t node_id, @@ -90,6 +91,7 @@ class PublisherData final PublisherData( const rmw_node_t * rmw_node, std::shared_ptr entity, + std::shared_ptr sess, z_owned_publisher_t pub, std::optional pub_cache, z_owned_liveliness_token_t token, @@ -102,6 +104,8 @@ class PublisherData final const rmw_node_t * rmw_node_; // The Entity generated for the publisher. std::shared_ptr entity_; + // A shared session. + std::shared_ptr sess_; // An owned publisher. z_owned_publisher_t pub_; // Optional publication cache when durability is transient_local. diff --git a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp index c7be982c..3d456a02 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp @@ -65,7 +65,7 @@ void service_data_handler(z_loaned_query_t * query, void * data) ///============================================================================= std::shared_ptr ServiceData::make( - const z_loaned_session_t * session, + std::shared_ptr session, const rmw_node_t * const node, liveliness::NodeInfo node_info, std::size_t node_id, @@ -126,7 +126,7 @@ std::shared_ptr ServiceData::make( std::size_t domain_id = node_info.domain_id_; auto entity = liveliness::Entity::make( - z_info_zid(session), + z_info_zid(session->loan()), std::to_string(node_id), std::to_string(service_id), liveliness::EntityType::Service, @@ -150,6 +150,7 @@ std::shared_ptr ServiceData::make( new ServiceData{ node, std::move(entity), + session, request_members, response_members, std::move(request_type_support), @@ -180,7 +181,7 @@ std::shared_ptr ServiceData::make( z_queryable_options_default(&qable_options); qable_options.complete = true; if (z_declare_queryable( - session, &service_data->qable_, z_loan(service_data->keyexpr_), + session->loan(), &service_data->qable_, z_loan(service_data->keyexpr_), z_move(callback), &qable_options) != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh queryable"); @@ -206,7 +207,7 @@ std::shared_ptr ServiceData::make( } }); if (z_liveliness_declare_token( - session, &service_data->token_, z_loan(liveliness_ke), + session->loan(), &service_data->token_, z_loan(liveliness_ke), NULL) != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( @@ -228,12 +229,14 @@ std::shared_ptr ServiceData::make( ServiceData::ServiceData( const rmw_node_t * rmw_node, std::shared_ptr entity, + std::shared_ptr sess, const void * request_type_support_impl, const void * response_type_support_impl, std::unique_ptr request_type_support, std::unique_ptr response_type_support) : rmw_node_(rmw_node), entity_(std::move(entity)), + sess_(std::move(sess)), request_type_support_impl_(request_type_support_impl), response_type_support_impl_(response_type_support_impl), request_type_support_(std::move(request_type_support)), @@ -526,6 +529,7 @@ rmw_ret_t ServiceData::shutdown() z_undeclare_queryable(z_move(qable_)); } + sess_.reset(); is_shutdown_ = true; return RMW_RET_OK; } diff --git a/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp index 75467494..a8bcbd66 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp @@ -47,7 +47,7 @@ class ServiceData final public: // Make a shared_ptr of ServiceData. static std::shared_ptr make( - const z_loaned_session_t * session, + std::shared_ptr session, const rmw_node_t * const node, liveliness::NodeInfo node_info, std::size_t node_id, @@ -99,6 +99,7 @@ class ServiceData final ServiceData( const rmw_node_t * rmw_node, std::shared_ptr entity, + std::shared_ptr sess, const void * request_type_support_impl, const void * response_type_support_impl, std::unique_ptr request_type_support, @@ -112,6 +113,8 @@ class ServiceData final std::shared_ptr entity_; // An owned keyexpression. z_owned_keyexpr_t keyexpr_; + // A shared session. + std::shared_ptr sess_; // An owned queryable. z_owned_queryable_t qable_; // Liveliness token for the service. diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index f0036606..4f21300b 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -94,7 +94,7 @@ SubscriptionData::Message::~Message() ///============================================================================= std::shared_ptr SubscriptionData::make( - const z_loaned_session_t * session, + std::shared_ptr session, std::shared_ptr graph_cache, const rmw_node_t * const node, liveliness::NodeInfo node_info, @@ -136,7 +136,7 @@ std::shared_ptr SubscriptionData::make( // with Zenoh; after this, callbacks may come in at any time. std::size_t domain_id = node_info.domain_id_; auto entity = liveliness::Entity::make( - z_info_zid(session), + z_info_zid(session->loan()), std::to_string(node_id), std::to_string(subscription_id), liveliness::EntityType::Subscription, @@ -161,6 +161,7 @@ std::shared_ptr SubscriptionData::make( node, graph_cache, std::move(entity), + std::move(session), type_support->data, std::move(message_type_support) }); @@ -178,11 +179,13 @@ SubscriptionData::SubscriptionData( const rmw_node_t * rmw_node, std::shared_ptr graph_cache, std::shared_ptr entity, + std::shared_ptr sess, const void * type_support_impl, std::unique_ptr type_support) : rmw_node_(rmw_node), graph_cache_(std::move(graph_cache)), entity_(std::move(entity)), + sess_(std::move(sess)), type_support_impl_(type_support_impl), type_support_(std::move(type_support)), last_known_published_msg_({}), @@ -212,6 +215,8 @@ bool SubscriptionData::init() rmw_context_impl_t * context_impl = static_cast(rmw_node_->context->impl); + sess_ = context_impl->session(); + // Instantiate the subscription with suitable options depending on the // adapted_qos_profile. // TODO(Yadunund): Rely on a separate function to return the sub @@ -238,7 +243,7 @@ bool SubscriptionData::init() sub_options.query_consolidation = z_query_consolidation_none(); ze_owned_querying_subscriber_t sub; if (ze_declare_querying_subscriber( - context_impl->session(), &sub, z_loan(sub_ke), z_move(callback), &sub_options)) + sess_->loan(), &sub, z_loan(sub_ke), z_move(callback), &sub_options)) { RMW_SET_ERROR_MSG("unable to create zenoh subscription"); return false; @@ -292,7 +297,7 @@ bool SubscriptionData::init() z_owned_subscriber_t sub; if (z_declare_subscriber( - context_impl->session(), &sub, z_loan(sub_ke), z_move(callback), + sess_->loan(), &sub, z_loan(sub_ke), z_move(callback), &sub_options) != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh subscription"); @@ -325,7 +330,7 @@ bool SubscriptionData::init() z_drop(z_move(token_)); }); if (z_liveliness_declare_token( - context_impl->session(), &token_, z_loan(liveliness_ke), NULL) != Z_OK) + sess_->loan(), &token_, z_loan(liveliness_ke), NULL) != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -422,6 +427,7 @@ rmw_ret_t SubscriptionData::shutdown() } } + sess_.reset(); is_shutdown_ = true; initialized_ = false; return ret; diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp index 84d9a488..08a09def 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp @@ -34,6 +34,7 @@ #include "message_type_support.hpp" #include "attachment_helpers.hpp" #include "type_support_common.hpp" +#include "zenoh_utils.hpp" #include "rcutils/allocator.h" @@ -62,7 +63,7 @@ class SubscriptionData final : public std::enable_shared_from_this make( - const z_loaned_session_t * session, + std::shared_ptr session, std::shared_ptr graph_cache, const rmw_node_t * const node, liveliness::NodeInfo node_info, @@ -126,6 +127,7 @@ class SubscriptionData final : public std::enable_shared_from_this graph_cache, std::shared_ptr entity, + std::shared_ptr sess, const void * type_support_impl, std::unique_ptr type_support); @@ -139,6 +141,8 @@ class SubscriptionData final : public std::enable_shared_from_this graph_cache_; // The Entity generated for the subscription. std::shared_ptr entity_; + // A shared session + std::shared_ptr sess_; // An owned subscriber or querying_subscriber depending on the QoS settings. std::variant sub_; // Liveliness token for the subscription. diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp index 77bb4dc2..be5721b7 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp @@ -24,6 +24,20 @@ namespace rmw_zenoh_cpp { +/// Loan the zenoh session. +///============================================================================= +const z_loaned_session_t * ZenohSession::loan() +{ + return z_loan(inner_); +} + +/// Close the zenoh session if destructed. +///============================================================================= +ZenohSession::~ZenohSession() +{ + z_close(z_loan_mut(inner_), NULL); +} + ///============================================================================= void create_map_and_set_sequence_num( z_owned_bytes_t * out_bytes, int64_t sequence_number, uint8_t gid[RMW_GID_STORAGE_SIZE]) diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp index 5f569082..ae2aed3a 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp @@ -25,6 +25,21 @@ namespace rmw_zenoh_cpp { + +/// A wrapped zenoh session with customized destruction. +///============================================================================= +class ZenohSession final +{ +public: + ZenohSession(z_owned_session_t sess) + : inner_(sess) {} + const z_loaned_session_t * loan(); + ~ZenohSession(); + +private: + z_owned_session_t inner_; +}; + ///============================================================================= void create_map_and_set_sequence_num( From ca5058cae9eaa8aa4ef813763aa3f183234dce3b Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Wed, 11 Dec 2024 00:34:38 -0500 Subject: [PATCH 2/2] Another fix for ClientData. (#336) * Another fix for ClientData. Signed-off-by: Chris Lalancette * Minor optimizations Signed-off-by: Yadunund --------- Signed-off-by: Chris Lalancette Signed-off-by: Yadunund Co-authored-by: Yadunund --- rmw_zenoh_cpp/src/detail/rmw_client_data.cpp | 73 +++++++------------- rmw_zenoh_cpp/src/detail/rmw_client_data.hpp | 10 --- rmw_zenoh_cpp/src/detail/rmw_node_data.cpp | 8 +-- 3 files changed, 25 insertions(+), 66 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp index f202ad29..1236d453 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -42,12 +42,22 @@ namespace { +///============================================================================= +struct ClientDataWrapper +{ + explicit ClientDataWrapper(std::shared_ptr data) + : client_data(std::move(data)) + { + } + + std::shared_ptr client_data; +}; ///============================================================================= void client_data_handler(z_loaned_reply_t * reply, void * data) { - auto client_data = static_cast(data); - if (client_data == nullptr) { + auto wrapper = static_cast(data); + if (wrapper == nullptr) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to obtain client_data_t from data in client_data_handler." @@ -55,7 +65,7 @@ void client_data_handler(z_loaned_reply_t * reply, void * data) return; } - if (client_data->is_shutdown()) { + if (wrapper->client_data->is_shutdown()) { return; } @@ -69,7 +79,7 @@ void client_data_handler(z_loaned_reply_t * reply, void * data) RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "z_reply_is_ok returned False for keyexpr %s. Reason: %.*s", - client_data->topic_info().topic_keyexpr_.c_str(), + wrapper->client_data->topic_info().topic_keyexpr_.c_str(), static_cast(z_string_len(z_loan(err_str))), z_string_data(z_loan(err_str))); z_drop(z_move(err_str)); @@ -80,15 +90,15 @@ void client_data_handler(z_loaned_reply_t * reply, void * data) std::chrono::nanoseconds::rep received_timestamp = std::chrono::system_clock::now().time_since_epoch().count(); - client_data->add_new_reply( + wrapper->client_data->add_new_reply( std::make_unique(reply, received_timestamp)); } ///============================================================================= void client_data_drop(void * data) { - auto client_data = static_cast(data); - if (client_data == nullptr) { + auto wrapper = static_cast(data); + if (wrapper == nullptr) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to obtain client_data_t from data in client_data_drop." @@ -96,7 +106,7 @@ void client_data_drop(void * data) return; } - client_data->decrement_in_flight_and_conditionally_remove(); + delete wrapper; } } // namespace @@ -228,8 +238,7 @@ ClientData::ClientData( wait_set_data_(nullptr), sequence_number_(1), is_shutdown_(false), - initialized_(false), - num_in_flight_(0) + initialized_(false) { // Do nothing. } @@ -470,9 +479,9 @@ rmw_ret_t ClientData::send_request( // TODO(Yadunund): Once we switch to zenoh-cpp with lambda closures, // capture shared_from_this() instead of this. - num_in_flight_++; + ClientDataWrapper * wrapper = new ClientDataWrapper(shared_from_this()); z_owned_closure_reply_t zn_closure_reply; - z_closure(&zn_closure_reply, client_data_handler, client_data_drop, this); + z_closure(&zn_closure_reply, client_data_handler, client_data_drop, wrapper); z_get( sess_->loan(), z_loan(keyexpr_), "", @@ -527,10 +536,11 @@ bool ClientData::detach_condition_and_queue_is_empty() } ///============================================================================= -void ClientData::_shutdown() +rmw_ret_t ClientData::shutdown() { + std::lock_guard lock(mutex_); if (is_shutdown_) { - return; + return RMW_RET_OK; } // Unregister this node from the ROS graph. @@ -541,45 +551,10 @@ void ClientData::_shutdown() sess_.reset(); is_shutdown_ = true; -} -///============================================================================= -rmw_ret_t ClientData::shutdown() -{ - std::lock_guard lock(mutex_); - _shutdown(); return RMW_RET_OK; } -///============================================================================= -bool ClientData::shutdown_and_query_in_flight() -{ - std::lock_guard lock(mutex_); - _shutdown(); - return num_in_flight_ > 0; -} - -///============================================================================= -void ClientData::decrement_in_flight_and_conditionally_remove() -{ - std::unique_lock lock(mutex_); - --num_in_flight_; - - if (is_shutdown_ && num_in_flight_ == 0) { - rmw_context_impl_s * context_impl = static_cast(rmw_node_->data); - if (context_impl == nullptr) { - return; - } - std::shared_ptr node_data = context_impl->get_node_data(rmw_node_); - if (node_data == nullptr) { - return; - } - // We have to unlock here since we are about to delete ourself, and thus the unlock would be UB. - lock.unlock(); - node_data->delete_client_data(rmw_client_); - } -} - ///============================================================================= bool ClientData::is_shutdown() const { diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp index 349a26db..f582f539 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp @@ -95,12 +95,6 @@ class ClientData final : public std::enable_shared_from_this // Shutdown this ClientData. rmw_ret_t shutdown(); - // Shutdown this ClientData, and return whether there are any requests currently in flight. - bool shutdown_and_query_in_flight(); - - // Decrement the in flight requests, and if that drops to 0 remove the client from the node. - void decrement_in_flight_and_conditionally_remove(); - // Check if this ClientData is shutdown. bool is_shutdown() const; @@ -122,9 +116,6 @@ class ClientData final : public std::enable_shared_from_this // Initialize the Zenoh objects for this entity. bool init(std::shared_ptr session); - // Shutdown this client (the mutex is expected to be held by the caller). - void _shutdown(); - // Internal mutex. mutable std::recursive_mutex mutex_; // The parent node. @@ -155,7 +146,6 @@ class ClientData final : public std::enable_shared_from_this bool is_shutdown_; // Whether the object has ever successfully been initialized. bool initialized_; - size_t num_in_flight_; }; using ClientDataPtr = std::shared_ptr; using ClientDataConstPtr = std::shared_ptr; diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index 1255de21..c61daa2a 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -383,13 +383,7 @@ ClientDataPtr NodeData::get_client_data(const rmw_client_t * const client) void NodeData::delete_client_data(const rmw_client_t * const client) { std::lock_guard lock_guard(mutex_); - auto client_it = clients_.find(client); - if (client_it == clients_.end()) { - return; - } - if (!client_it->second->shutdown_and_query_in_flight()) { - clients_.erase(client); - } + clients_.erase(client); } ///=============================================================================