From 1ceae7a82b4dce42ba81267780da04e313df8bde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Hern=C3=A1ndez=20Cordero?= Date: Thu, 14 Nov 2024 16:46:56 +0100 Subject: [PATCH] Added zenoh_cpp session and token to subscription service_data publisher_data node_data MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Alejandro Hernández Cordero --- .../src/detail/rmw_context_impl_s.cpp | 18 +----- .../src/detail/rmw_context_impl_s.hpp | 2 +- rmw_zenoh_cpp/src/detail/rmw_node_data.cpp | 57 +++++++++++------- rmw_zenoh_cpp/src/detail/rmw_node_data.hpp | 12 ++-- .../src/detail/rmw_publisher_data.cpp | 55 ++++++++++------- .../src/detail/rmw_publisher_data.hpp | 6 +- rmw_zenoh_cpp/src/detail/rmw_service_data.cpp | 59 ++++++++++++------- rmw_zenoh_cpp/src/detail/rmw_service_data.hpp | 7 ++- .../src/detail/rmw_subscription_data.cpp | 47 ++++++++++----- .../src/detail/rmw_subscription_data.hpp | 6 +- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 6 +- 11 files changed, 161 insertions(+), 114 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index 22a0cc0d..9f539756 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -134,7 +134,7 @@ rmw_ret_t rmw_context_impl_s::Data::subscribe_to_ros_graph() // zenoh::Session::LivelinessSubscriberOptions options = zenoh::Session::LivelinessSubscriberOptions::create_default(); // options.history = true; // zenoh::ZResult err; - // auto graph_subscriber_cpp = session_cpp_.liveliness_declare_subscriber( + // graph_subscriber_cpp_ = session_cpp_->liveliness_declare_subscriber( // keyexpr_cpp, // [](const zenoh::Sample& s) { // auto data_ptr = static_cast(static_cast(s.get_attachment().value().get().as_vector().data())); @@ -168,8 +168,6 @@ rmw_ret_t rmw_context_impl_s::Data::subscribe_to_ros_graph() // std::move(options), // &err); - // graph_subscriber_cpp_.push_back(std::move(graph_subscriber_cpp)); - // if (err != Z_OK) { // RMW_SET_ERROR_MSG("unable to create zenoh subscription"); // return RMW_RET_ERROR; @@ -227,8 +225,6 @@ rmw_ret_t rmw_context_impl_s::Data::shutdown() z_undeclare_subscriber(z_move(graph_subscriber_)); - // graph_subscriber_cpp_.clear(); - #ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY // drop SHM subsystem if used shm_ = std::nullopt; @@ -269,14 +265,6 @@ rmw_context_impl_s::rmw_context_impl_s( throw std::runtime_error("Error setting up zenoh session. " + std::to_string(static_cast(result))); } - // // Check if shm is enabled. - // z_owned_string_t shm_enabled; - // zc_config_get_from_str(z_loan(config), Z_CONFIG_SHARED_MEMORY_KEY, &shm_enabled); - // auto always_free_shm_enabled = rcpputils::make_scope_exit( - // [&shm_enabled]() { - // z_drop(z_move(shm_enabled)); - // }); - rmw_ret_t ret; // TODO(Yadunund) Move this check into a separate thread. @@ -340,8 +328,6 @@ rmw_context_impl_s::rmw_context_impl_s( for (auto res = replies.recv(); std::holds_alternative(res); res = replies.recv()) { const auto &sample = std::get(res).get_ok(); graph_cache->parse_put(sample.get_payload().as_string(), true); - std::cout << "Received ('" << sample.get_keyexpr().as_string_view() << "' : '" - << sample.get_payload().as_string() << "')\n"; } #ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY @@ -478,7 +464,7 @@ bool rmw_context_impl_s::create_node_data( auto node_data = rmw_zenoh_cpp::NodeData::make( node, this->get_next_entity_id(), - z_loan(data_->session_), + data_->session_cpp_, data_->domain_id_, ns, node_name, diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index 48f6f5a2..ef5a79cf 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -141,7 +141,7 @@ class rmw_context_impl_s final std::shared_ptr graph_cache_; // ROS graph liveliness subscriber. z_owned_subscriber_t graph_subscriber_; - std::vector> graph_subscriber_cpp_; + std::optional> graph_subscriber_cpp_; // Equivalent to rmw_dds_common::Context's guard condition. // Guard condition that should be triggered when the graph changes. std::unique_ptr graph_guard_condition_; diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index f4eb0518..11131231 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -30,15 +30,15 @@ namespace rmw_zenoh_cpp std::shared_ptr NodeData::make( const rmw_node_t * const node, std::size_t id, - const z_loaned_session_t * session, + const std::shared_ptr & session, std::size_t domain_id, const std::string & namespace_, const std::string & node_name, const std::string & enclave) { // Create the entity. - auto entity = rmw_zenoh_cpp::liveliness::Entity::make( - z_info_zid(session), + auto entity = rmw_zenoh_cpp::liveliness::Entity::make_cpp( + session->get_zid(), std::to_string(id), std::to_string(id), rmw_zenoh_cpp::liveliness::EntityType::Node, @@ -58,20 +58,33 @@ std::shared_ptr NodeData::make( // Create the liveliness token. std::string liveliness_keyexpr = entity->liveliness_keyexpr(); - z_view_keyexpr_t liveliness_ke; - z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); - zc_owned_liveliness_token_t token; - auto free_token = rcpputils::make_scope_exit( - [&token]() { - z_drop(z_move(token)); - }); - if (zc_liveliness_declare_token(session, &token, z_loan(liveliness_ke), NULL) != Z_OK) { + zenoh::ZResult err; + auto token = session->liveliness_declare_token( + zenoh::KeyExpr(liveliness_keyexpr), + zenoh::Session::LivelinessDeclarationOptions::create_default(), + &err); + + if (err != Z_OK) + { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to create liveliness token for the node."); + "rmw_zenoh_cpp", + "Unable to create liveliness token for the client."); return nullptr; } - free_token.cancel(); + // z_view_keyexpr_t liveliness_ke; + // z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); + // zc_owned_liveliness_token_t token; + // auto free_token = rcpputils::make_scope_exit( + // [&token]() { + // z_drop(z_move(token)); + // }); + // if (zc_liveliness_declare_token(session, &token, z_loan(liveliness_ke), NULL) != Z_OK) { + // RMW_ZENOH_LOG_ERROR_NAMED( + // "rmw_zenoh_cpp", + // "Unable to create liveliness token for the node."); + // return nullptr; + // } + // free_token.cancel(); return std::shared_ptr( new NodeData{ @@ -87,7 +100,7 @@ NodeData::NodeData( const rmw_node_t * const node, std::size_t id, std::shared_ptr entity, - zc_owned_liveliness_token_t token) + zenoh::LivelinessToken token) : node_(node), id_(std::move(id)), entity_(std::move(entity)), @@ -121,7 +134,7 @@ std::size_t NodeData::id() const ///============================================================================= bool NodeData::create_pub_data( const rmw_publisher_t * const publisher, - const z_loaned_session_t * session, + const std::shared_ptr & session, std::size_t id, const std::string & topic_name, const rosidl_message_type_support_t * type_support, @@ -143,7 +156,7 @@ bool NodeData::create_pub_data( } auto pub_data = PublisherData::make( - std::move(session), + session, node_, entity_->node_info(), id_, @@ -187,7 +200,7 @@ void NodeData::delete_pub_data(const rmw_publisher_t * const publisher) ///============================================================================= bool NodeData::create_sub_data( const rmw_subscription_t * const subscription, - const z_loaned_session_t * session, + const std::shared_ptr & session, std::shared_ptr graph_cache, std::size_t id, const std::string & topic_name, @@ -210,7 +223,7 @@ bool NodeData::create_sub_data( } auto sub_data = SubscriptionData::make( - std::move(session), + session, std::move(graph_cache), node_, entity_->node_info(), @@ -255,7 +268,7 @@ void NodeData::delete_sub_data(const rmw_subscription_t * const subscription) ///============================================================================= bool NodeData::create_service_data( const rmw_service_t * const service, - const z_loaned_session_t * session, + const std::shared_ptr & session, std::size_t id, const std::string & service_name, const rosidl_service_type_support_t * type_supports, @@ -277,7 +290,7 @@ bool NodeData::create_service_data( } auto service_data = ServiceData::make( - std::move(session), + session, node_, entity_->node_info(), id_, @@ -366,7 +379,7 @@ rmw_ret_t NodeData::shutdown() } // Unregister this node from the ROS graph. - zc_liveliness_undeclare_token(z_move(token_)); + // zc_liveliness_undeclare_token(z_move(token_)); is_shutdown_ = true; return ret; diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp index f3da6497..e9a7d924 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp @@ -40,7 +40,7 @@ class NodeData final static std::shared_ptr make( const rmw_node_t * const node, std::size_t id, - const z_loaned_session_t * session, + const std::shared_ptr & session, std::size_t domain_id, const std::string & namespace_, const std::string & node_name, @@ -52,7 +52,7 @@ class NodeData final // Create a new PublisherData for a given rmw_publisher_t. bool create_pub_data( const rmw_publisher_t * const publisher, - const z_loaned_session_t * session, + const std::shared_ptr & session, std::size_t id, const std::string & topic_name, const rosidl_message_type_support_t * type_support, @@ -67,7 +67,7 @@ class NodeData final // Create a new SubscriptionData for a given rmw_subscription_t. bool create_sub_data( const rmw_subscription_t * const subscription, - const z_loaned_session_t * session, + const std::shared_ptr & session, std::shared_ptr graph_cache, std::size_t id, const std::string & topic_name, @@ -83,7 +83,7 @@ class NodeData final // Create a new ServiceData for a given rmw_service_t. bool create_service_data( const rmw_service_t * const service, - const z_loaned_session_t * session, + const std::shared_ptr & session, std::size_t id, const std::string & service_name, const rosidl_service_type_support_t * type_support, @@ -110,7 +110,7 @@ class NodeData final const rmw_node_t * const node, std::size_t id, std::shared_ptr entity, - zc_owned_liveliness_token_t token); + zenoh::LivelinessToken token); // Internal mutex. mutable std::mutex mutex_; // The rmw_node_t associated with this NodeData. @@ -121,7 +121,7 @@ class NodeData final // The Entity generated for the node. std::shared_ptr entity_; // Liveliness token for the node. - zc_owned_liveliness_token_t token_; + std::optional token_; // Shutdown flag. bool is_shutdown_; // Map of publishers. diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index dfac42f4..c28233e7 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -39,7 +39,7 @@ namespace rmw_zenoh_cpp ///============================================================================= std::shared_ptr PublisherData::make( - const z_loaned_session_t * session, + const std::shared_ptr & session, const rmw_node_t * const node, liveliness::NodeInfo node_info, std::size_t node_id, @@ -78,8 +78,8 @@ std::shared_ptr PublisherData::make( }); std::size_t domain_id = node_info.domain_id_; - auto entity = liveliness::Entity::make( - z_info_zid(session), + auto entity = liveliness::Entity::make_cpp( + session->get_zid(), std::to_string(node_id), std::to_string(publisher_id), liveliness::EntityType::Publisher, @@ -132,7 +132,7 @@ std::shared_ptr PublisherData::make( ze_owned_publication_cache_t pub_cache_; if (ze_declare_publication_cache( - session, &pub_cache_, z_loan(pub_ke), &pub_cache_opts)) + z_loan(session->_0), &pub_cache_, z_loan(pub_ke), &pub_cache_opts)) { RMW_SET_ERROR_MSG("unable to create zenoh publisher cache"); return nullptr; @@ -159,31 +159,44 @@ std::shared_ptr PublisherData::make( z_undeclare_publisher(z_move(pub)); }); if (z_declare_publisher( - session, &pub, z_loan(pub_ke), &opts) != Z_OK) + z_loan(session->_0), &pub, z_loan(pub_ke), &opts) != Z_OK) { RMW_SET_ERROR_MSG("Unable to create Zenoh publisher."); return nullptr; } std::string liveliness_keyexpr = entity->liveliness_keyexpr(); - z_view_keyexpr_t liveliness_ke; - z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); - zc_owned_liveliness_token_t token; - auto free_token = rcpputils::make_scope_exit( - [&token]() { - z_drop(z_move(token)); - }); - if (zc_liveliness_declare_token( - session, &token, z_loan(liveliness_ke), - NULL) != Z_OK) + zenoh::ZResult err; + auto token = session->liveliness_declare_token( + zenoh::KeyExpr(liveliness_keyexpr), + zenoh::Session::LivelinessDeclarationOptions::create_default(), + &err); + + if (err != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to create liveliness token for the publisher."); + "rmw_zenoh_cpp", + "Unable to create liveliness token for the client."); return nullptr; } - - free_token.cancel(); + // z_view_keyexpr_t liveliness_ke; + // z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); + // zc_owned_liveliness_token_t token; + // auto free_token = rcpputils::make_scope_exit( + // [&token]() { + // z_drop(z_move(token)); + // }); + // if (zc_liveliness_declare_token( + // session, &token, z_loan(liveliness_ke), + // NULL) != Z_OK) + // { + // RMW_ZENOH_LOG_ERROR_NAMED( + // "rmw_zenoh_cpp", + // "Unable to create liveliness token for the publisher."); + // return nullptr; + // } + + // free_token.cancel(); undeclare_z_publisher_cache.cancel(); undeclare_z_publisher.cancel(); @@ -205,7 +218,7 @@ PublisherData::PublisherData( std::shared_ptr entity, z_owned_publisher_t pub, std::optional pub_cache, - zc_owned_liveliness_token_t token, + zenoh::LivelinessToken token, const void * type_support_impl, std::unique_ptr type_support) : rmw_node_(rmw_node), @@ -455,7 +468,7 @@ rmw_ret_t PublisherData::shutdown() } // Unregister this publisher from the ROS graph. - zc_liveliness_undeclare_token(z_move(token_)); + // zc_liveliness_undeclare_token(z_move(token_)); if (pub_cache_.has_value()) { z_drop(z_move(pub_cache_.value())); } diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp index eca222b7..4e537df0 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp @@ -39,7 +39,7 @@ class PublisherData final public: // Make a shared_ptr of PublisherData. static std::shared_ptr make( - const z_loaned_session_t * session, + const std::shared_ptr & session, const rmw_node_t * const node, liveliness::NodeInfo node_info, std::size_t node_id, @@ -92,7 +92,7 @@ class PublisherData final std::shared_ptr entity, z_owned_publisher_t pub, std::optional pub_cache, - zc_owned_liveliness_token_t token, + zenoh::LivelinessToken token, const void * type_support_impl, std::unique_ptr type_support); @@ -107,7 +107,7 @@ class PublisherData final // Optional publication cache when durability is transient_local. std::optional pub_cache_; // Liveliness token for the publisher. - zc_owned_liveliness_token_t token_; + std::optional token_; // Type support fields const void * type_support_impl_; std::unique_ptr type_support_; diff --git a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp index 5cb53d58..66721511 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp @@ -32,6 +32,8 @@ #include "rmw/error_handling.h" +#include + namespace rmw_zenoh_cpp { ///============================================================================== @@ -59,7 +61,7 @@ void service_data_handler(z_loaned_query_t * query, void * data) ///============================================================================= std::shared_ptr ServiceData::make( - const z_loaned_session_t * session, + const std::shared_ptr & session, const rmw_node_t * const node, liveliness::NodeInfo node_info, std::size_t node_id, @@ -119,8 +121,8 @@ std::shared_ptr ServiceData::make( }); std::size_t domain_id = node_info.domain_id_; - auto entity = liveliness::Entity::make( - z_info_zid(session), + auto entity = liveliness::Entity::make_cpp( + session->get_zid(), std::to_string(node_id), std::to_string(service_id), liveliness::EntityType::Service, @@ -170,7 +172,7 @@ std::shared_ptr ServiceData::make( z_undeclare_queryable(z_move(service_data->qable_)); }); if (z_declare_queryable( - session, &service_data->qable_, z_loan(service_ke), + z_loan(session->_0), &service_data->qable_, z_loan(service_ke), z_move(callback), &qable_options) != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh queryable"); @@ -178,29 +180,42 @@ std::shared_ptr ServiceData::make( } std::string liveliness_keyexpr = service_data->entity_->liveliness_keyexpr(); - z_view_keyexpr_t liveliness_ke; - if (z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()) != Z_OK) { - RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); - return nullptr; - } - auto free_token = rcpputils::make_scope_exit( - [service_data]() { - if (service_data != nullptr) { - z_drop(z_move(service_data->token_)); - } - }); - if (zc_liveliness_declare_token( - session, &service_data->token_, z_loan(liveliness_ke), - NULL) != Z_OK) + // z_view_keyexpr_t liveliness_ke; + // if (z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()) != Z_OK) { + // RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); + // return nullptr; + // } + // auto free_token = rcpputils::make_scope_exit( + // [service_data]() { + // if (service_data != nullptr) { + // z_drop(z_move(service_data->token_)); + // } + // }); + // if (zc_liveliness_declare_token( + // session, &service_data->token_, z_loan(liveliness_ke), + // NULL) != Z_OK) + // { + // RMW_ZENOH_LOG_ERROR_NAMED( + // "rmw_zenoh_cpp", + // "Unable to create liveliness token for the service."); + // return nullptr; + // } + zenoh::ZResult err; + service_data->token_ = session->liveliness_declare_token( + zenoh::KeyExpr(liveliness_keyexpr), + zenoh::Session::LivelinessDeclarationOptions::create_default(), + &err); + + if (err != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to create liveliness token for the service."); + "rmw_zenoh_cpp", + "Unable to create liveliness token for the service."); return nullptr; } undeclare_z_queryable.cancel(); - free_token.cancel(); + // free_token.cancel(); return service_data; } @@ -492,7 +507,7 @@ rmw_ret_t ServiceData::shutdown() } // Unregister this node from the ROS graph. - zc_liveliness_undeclare_token(z_move(token_)); + // zc_liveliness_undeclare_token(z_move(token_)); z_undeclare_queryable(z_move(qable_)); is_shutdown_ = true; diff --git a/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp index cddaff88..1384c0a7 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp @@ -39,6 +39,9 @@ #include "rmw/rmw.h" #include "rmw/ret_types.h" +#include +#include + namespace rmw_zenoh_cpp { @@ -48,7 +51,7 @@ class ServiceData final public: // Make a shared_ptr of ServiceData. static std::shared_ptr make( - const z_loaned_session_t * session, + const std::shared_ptr & session, const rmw_node_t * const node, liveliness::NodeInfo node_info, std::size_t node_id, @@ -113,7 +116,7 @@ class ServiceData final // An owned queryable. z_owned_queryable_t qable_; // Liveliness token for the service. - zc_owned_liveliness_token_t token_; + std::optional token_; // Type support fields. const void * request_type_support_impl_; const void * response_type_support_impl_; diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index 8d2c8805..9658a5c1 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -91,7 +91,7 @@ SubscriptionData::Message::~Message() ///============================================================================= std::shared_ptr SubscriptionData::make( - const z_loaned_session_t * session, + const std::shared_ptr & session, std::shared_ptr graph_cache, const rmw_node_t * const node, liveliness::NodeInfo node_info, @@ -132,8 +132,8 @@ std::shared_ptr SubscriptionData::make( // Everything above succeeded and is setup properly. Now declare a subscriber // with Zenoh; after this, callbacks may come in at any time. std::size_t domain_id = node_info.domain_id_; - auto entity = liveliness::Entity::make( - z_info_zid(session), + auto entity = liveliness::Entity::make_cpp( + session->get_zid(), std::to_string(node_id), std::to_string(subscription_id), liveliness::EntityType::Subscription, @@ -320,24 +320,39 @@ bool SubscriptionData::init() // Publish to the graph that a new subscription is in town. std::string liveliness_keyexpr = entity_->liveliness_keyexpr(); - z_view_keyexpr_t liveliness_ke; - z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); - - auto free_token = rcpputils::make_scope_exit( - [this]() { - z_drop(z_move(token_)); - }); - if (zc_liveliness_declare_token( - context_impl->session(), &token_, z_loan(liveliness_ke), NULL) != Z_OK) + // z_view_keyexpr_t liveliness_ke; + // z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); + + // auto free_token = rcpputils::make_scope_exit( + // [this]() { + // z_drop(z_move(token_)); + // }); + // if (zc_liveliness_declare_token( + // context_impl->session(), &token_, z_loan(liveliness_ke), NULL) != Z_OK) + // { + // RMW_ZENOH_LOG_ERROR_NAMED( + // "rmw_zenoh_cpp", + // "Unable to create liveliness token for the subscription."); + // return false; + // } + + // Create the liveliness token. + zenoh::ZResult err; + token_ = context_impl->session_cpp()->liveliness_declare_token( + zenoh::KeyExpr(liveliness_keyexpr), + zenoh::Session::LivelinessDeclarationOptions::create_default(), + &err); + + if (err != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to create liveliness token for the subscription."); + "rmw_zenoh_cpp", + "Unable to create liveliness token for the subscription."); return false; } undeclare_z_sub.cancel(); - free_token.cancel(); + // free_token.cancel(); initialized_ = true; @@ -396,7 +411,7 @@ rmw_ret_t SubscriptionData::shutdown() graph_cache_->remove_qos_event_callbacks(entity_->keyexpr_hash()); // Unregister this subscription from the ROS graph. - zc_liveliness_undeclare_token(z_move(token_)); + // zc_liveliness_undeclare_token(z_move(token_)); z_owned_subscriber_t * sub = std::get_if(&sub_); if (sub != nullptr) { diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp index 4f864994..03bbebba 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp @@ -41,6 +41,8 @@ #include "rmw/rmw.h" #include "rmw/ret_types.h" +#include + namespace rmw_zenoh_cpp { ///============================================================================= @@ -63,7 +65,7 @@ class SubscriptionData final : public std::enable_shared_from_this make( - const z_loaned_session_t * session, + const std::shared_ptr & session, std::shared_ptr graph_cache, const rmw_node_t * const node, liveliness::NodeInfo node_info, @@ -143,7 +145,7 @@ class SubscriptionData final : public std::enable_shared_from_this sub_; // Liveliness token for the subscription. - zc_owned_liveliness_token_t token_; + std::optional token_; // Type support fields const void * type_support_impl_; std::unique_ptr type_support_; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 5088980b..0d30ac8d 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -421,7 +421,7 @@ rmw_create_publisher( if (!node_data->create_pub_data( rmw_publisher, - context_impl->session(), + context_impl->session_cpp(), context_impl->get_next_entity_id(), topic_name, type_support, @@ -953,7 +953,7 @@ rmw_create_subscription( if (!node_data->create_sub_data( rmw_subscription, - context_impl->session(), + context_impl->session_cpp(), context_impl->graph_cache(), context_impl->get_next_entity_id(), topic_name, @@ -2007,7 +2007,7 @@ rmw_create_service( if (!node_data->create_service_data( rmw_service, - context_impl->session(), + context_impl->session_cpp(), context_impl->get_next_entity_id(), service_name, type_support,