From 03c4ee1e67f6c4609ab4bba9c922e7267e165396 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Tue, 6 Aug 2024 08:29:26 +0800 Subject: [PATCH] Make rmw_context_impl_s an RAII class Signed-off-by: Yadunund --- rmw_zenoh_cpp/CMakeLists.txt | 1 + .../src/detail/rmw_context_impl_s.cpp | 450 ++++++++++++++++++ .../src/detail/rmw_context_impl_s.hpp | 197 ++++++++ rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 8 - rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 32 -- rmw_zenoh_cpp/src/rmw_event.cpp | 9 +- .../src/rmw_get_node_info_and_types.cpp | 18 +- .../src/rmw_get_service_names_and_types.cpp | 6 +- .../src/rmw_get_topic_endpoint_info.cpp | 10 +- .../src/rmw_get_topic_names_and_types.cpp | 6 +- rmw_zenoh_cpp/src/rmw_init.cpp | 267 +++-------- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 112 ++--- 12 files changed, 790 insertions(+), 326 deletions(-) create mode 100644 rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp create mode 100644 rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index d2145006..78873587 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -38,6 +38,7 @@ add_library(rmw_zenoh_cpp SHARED src/detail/logging.cpp src/detail/message_type_support.cpp src/detail/qos.cpp + src/detail/rmw_context_impl_s.cpp src/detail/rmw_data_types.cpp src/detail/service_type_support.cpp src/detail/type_support.cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp new file mode 100644 index 00000000..c182ef11 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -0,0 +1,450 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rmw_context_impl_s.hpp" + +#include + +#include "guard_condition.hpp" +#include "liveliness_utils.hpp" +#include "logging_macros.hpp" +#include "zenoh_config.hpp" + +#include "rcpputils/scope_exit.hpp" +#include "rmw/error_handling.h" +#include "rmw/impl/cpp/macros.hpp" + +///============================================================================= +void rmw_context_impl_s::graph_sub_data_handler(const z_sample_t * sample, void * data) +{ + z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); + auto free_keystr = rcpputils::make_scope_exit( + [&keystr]() { + z_drop(z_move(keystr)); + }); + + auto data_ptr = static_cast(data); + if (data_ptr == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "[graph_sub_data_handler] Unable to lock data_wp." + ); + return; + } + + // Update the graph cache. + std::lock_guard lock(data_ptr->mutex_); + if (data_ptr->is_shutdown_) { + return; + } + switch (sample->kind) { + case z_sample_kind_t::Z_SAMPLE_KIND_PUT: + data_ptr->graph_cache_->parse_put(keystr._cstr); + break; + case z_sample_kind_t::Z_SAMPLE_KIND_DELETE: + data_ptr->graph_cache_->parse_del(keystr._cstr); + default: + return; + } + + // Trigger the ROS graph guard condition. + rmw_ret_t rmw_ret = rmw_trigger_guard_condition(data_ptr->graph_guard_condition_); + if (RMW_RET_OK != rmw_ret) { + RMW_ZENOH_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "[graph_sub_data_handler] Unable to trigger graph guard condition." + ); + } +} + +///============================================================================= +rmw_context_impl_s::Data::Data( + const rcutils_allocator_t * allocator, + const std::size_t domain_id, + const std::string & enclave, + z_owned_session_t session, + std::optional shm_manager, + rmw_guard_condition_t * graph_guard_condition) +: allocator_(allocator), + enclave_(std::move(enclave)), + session_(std::move(session)), + shm_manager_(std::move(shm_manager)), + graph_guard_condition_(graph_guard_condition), + is_shutdown_(false), + next_entity_id_(0), + is_initialized_(false) +{ + z_id_t zid = z_info_zid(z_loan(session_)); + graph_cache_ = std::make_unique(std::move(zid)); + // Setup liveliness subscriptions for discovery. + liveliness_str_ = rmw_zenoh_cpp::liveliness::subscription_token( + domain_id); + + // Query router/liveliness participants to get graph information before this session was started. + // We create a blocking channel that is unbounded, ie. `bound` = 0, to receive + // replies for the zc_liveliness_get() call. This is necessary as if the `bound` + // is too low, the channel may starve the zenoh executor of its threads which + // would lead to deadlocks when trying to receive replies and block the + // execution here. + // The blocking channel will return when the sender end is closed which is + // the moment the query finishes. + // The non-blocking fifo exists only for the use case where we don't want to + // block the thread between responses (including the request termination response). + // In general, unless we want to cooperatively schedule other tasks on the same + // thread as reading the fifo, the blocking fifo will be more appropriate as + // the code will be simpler, and if we're just going to spin over the non-blocking + // reads until we obtain responses, we'll just be hogging CPU time by convincing + // the OS that we're doing actual work when it could instead park the thread. + z_owned_reply_channel_t channel = zc_reply_fifo_new(0); + zc_liveliness_get( + z_loan(session_), z_keyexpr(liveliness_str_.c_str()), + z_move(channel.send), NULL); + z_owned_reply_t reply = z_reply_null(); + for (bool call_success = z_call(channel.recv, &reply); !call_success || z_check(reply); + call_success = z_call(channel.recv, &reply)) + { + if (!call_success) { + continue; + } + if (z_reply_is_ok(&reply)) { + z_sample_t sample = z_reply_ok(&reply); + z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); + // Ignore tokens from the same session to avoid race conditions from this + // query and the liveliness subscription. + graph_cache_->parse_put(z_loan(keystr), true); + z_drop(z_move(keystr)); + } else { + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", "[rmw_context_impl_s] z_call received an invalid reply\n"); + } + } + z_drop(z_move(reply)); + z_drop(z_move(channel)); +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::Data::subscribe() +{ + if (is_initialized_) { + return RMW_RET_OK; + } + // Setup the liveliness subscriber to receives updates from the ROS graph + // and update the graph cache. + auto sub_options = zc_liveliness_subscriber_options_null(); + z_owned_closure_sample_t callback = z_closure(rmw_context_impl_s::graph_sub_data_handler, nullptr, + this->shared_from_this().get()); + graph_subscriber_ = zc_liveliness_declare_subscriber( + z_loan(session_), + z_keyexpr(liveliness_str_.c_str()), + z_move(callback), + &sub_options); + zc_liveliness_subscriber_options_drop(z_move(sub_options)); + auto undeclare_z_sub = rcpputils::make_scope_exit( + [this]() { + z_undeclare_subscriber(z_move(this->graph_subscriber_)); + }); + if (!z_check(graph_subscriber_)) { + RMW_SET_ERROR_MSG("unable to create zenoh subscription"); + return RMW_RET_ERROR; + } + + undeclare_z_sub.cancel(); + is_initialized_ = true; + return RMW_RET_OK; +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::Data::shutdown() +{ + std::lock_guard lock(mutex_); + if (is_shutdown_) { + return RMW_RET_OK; + } + + z_undeclare_subscriber(z_move(graph_subscriber_)); + if (shm_manager_.has_value()) { + z_drop(z_move(shm_manager_.value())); + } + // Close the zenoh session + if (z_close(z_move(session_)) < 0) { + RMW_SET_ERROR_MSG("Error while closing zenoh session"); + return RMW_RET_ERROR; + } + is_shutdown_ = true; + return RMW_RET_OK; +} + +///============================================================================= +rmw_context_impl_s::Data::~Data() +{ + RMW_TRY_DESTRUCTOR( + static_cast( + graph_guard_condition_->data)->~GuardCondition(), + rmw_zenoh_cpp::GuardCondition, ); + if (rcutils_allocator_is_valid(allocator_)) { + allocator_->deallocate(graph_guard_condition_->data, allocator_->state); + allocator_->deallocate(graph_guard_condition_, allocator_->state); + graph_guard_condition_ = nullptr; + } + + auto ret = this->shutdown(); + static_cast(ret); +} + +///============================================================================= +rmw_context_impl_s::rmw_context_impl_s( + const rcutils_allocator_t * allocator, + const std::size_t domain_id, + const std::string & enclave, + z_owned_session_t session, + std::optional shm_manager, + rmw_guard_condition_t * graph_guard_condition) +{ + data_ = std::make_shared( + allocator, + std::move(domain_id), + std::move(enclave), + std::move(session), + std::move(shm_manager), + graph_guard_condition); + + // TODO(Yadunund): Consider switching to a make() pattern to avoid throwing + // errors. + auto ret = data_->subscribe(); + if (ret != RMW_RET_OK) { + throw std::runtime_error("Unable to subscribe to ROS Graph updates."); + } +} + +///============================================================================= +std::string rmw_context_impl_s::enclave() const +{ + std::lock_guard lock(data_->mutex_); + return data_->enclave_; +} + +///============================================================================= +z_session_t rmw_context_impl_s::session() const +{ + std::lock_guard lock(data_->mutex_); + return z_loan(data_->session_); +} + +///============================================================================= +std::optional & rmw_context_impl_s::shm_manager() +{ + std::lock_guard lock(data_->mutex_); + return data_->shm_manager_; +} + +///============================================================================= +rmw_guard_condition_t * rmw_context_impl_s::graph_guard_condition() +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_guard_condition_; +} + +///============================================================================= +size_t rmw_context_impl_s::get_next_entity_id() +{ + std::lock_guard lock(data_->mutex_); + return data_->next_entity_id_++; +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::shutdown() +{ + return data_->shutdown(); +} + +///============================================================================= +bool rmw_context_impl_s::is_shutdown() const +{ + std::lock_guard lock(data_->mutex_); + return data_->is_shutdown_; +} + +///============================================================================= +bool rmw_context_impl_s::session_is_valid() const +{ + std::lock_guard lock(data_->mutex_); + return z_check(data_->session_); +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::get_node_names( + rcutils_string_array_t * node_names, + rcutils_string_array_t * node_namespaces, + rcutils_string_array_t * enclaves, + rcutils_allocator_t * allocator) const +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_cache_->get_node_names( + node_names, + node_namespaces, + enclaves, + allocator); +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::get_topic_names_and_types( + rcutils_allocator_t * allocator, + bool no_demangle, + rmw_names_and_types_t * topic_names_and_types) const +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_cache_->get_topic_names_and_types( + allocator, + no_demangle, + topic_names_and_types); +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::publisher_count_matched_subscriptions( + const rmw_publisher_t * publisher, + size_t * subscription_count) +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_cache_->publisher_count_matched_subscriptions( + publisher, + subscription_count); +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::subscription_count_matched_publishers( + const rmw_subscription_t * subscription, + size_t * publisher_count) +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_cache_->subscription_count_matched_publishers( + subscription, + publisher_count); +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::get_service_names_and_types( + rcutils_allocator_t * allocator, + rmw_names_and_types_t * service_names_and_types) const +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_cache_->get_service_names_and_types( + allocator, + service_names_and_types); +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::count_publishers( + const char * topic_name, + size_t * count) const +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_cache_->count_publishers( + topic_name, + count); +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::count_subscriptions( + const char * topic_name, + size_t * count) const +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_cache_->count_subscriptions( + topic_name, + count); +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::count_services( + const char * service_name, + size_t * count) const +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_cache_->count_services( + service_name, + count); +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::count_clients( + const char * service_name, + size_t * count) const +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_cache_->count_clients( + service_name, + count); +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::get_entity_names_and_types_by_node( + rmw_zenoh_cpp::liveliness::EntityType entity_type, + rcutils_allocator_t * allocator, + const char * node_name, + const char * node_namespace, + bool no_demangle, + rmw_names_and_types_t * names_and_types) const +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_cache_->get_entity_names_and_types_by_node( + entity_type, + allocator, + node_name, + node_namespace, + no_demangle, + names_and_types); +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::get_entities_info_by_topic( + rmw_zenoh_cpp::liveliness::EntityType entity_type, + rcutils_allocator_t * allocator, + const char * topic_name, + bool no_demangle, + rmw_topic_endpoint_info_array_t * endpoints_info) const +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_cache_->get_entities_info_by_topic( + entity_type, + allocator, + topic_name, + no_demangle, + endpoints_info); +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::service_server_is_available( + const char * service_name, + const char * service_type, + bool * is_available) const +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_cache_->service_server_is_available( + service_name, + service_type, + is_available); +} + +///============================================================================= +void rmw_context_impl_s::set_qos_event_callback( + rmw_zenoh_cpp::liveliness::ConstEntityPtr entity, + const rmw_zenoh_cpp::rmw_zenoh_event_type_t & event_type, + GraphCacheEventCallback callback) +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_cache_->set_qos_event_callback( + std::move(entity), + event_type, + std::move(callback)); +} diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp new file mode 100644 index 00000000..15f3a632 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -0,0 +1,197 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__RMW_CONTEXT_IMPL_S_HPP_ +#define DETAIL__RMW_CONTEXT_IMPL_S_HPP_ + +#include + +#include +#include +#include +#include + +#include "graph_cache.hpp" +#include "liveliness_utils.hpp" + +#include "rcutils/types.h" +#include "rmw/rmw.h" + +///============================================================================= +class rmw_context_impl_s final +{ +public: + using GraphCacheEventCallback = rmw_zenoh_cpp::GraphCache::GraphCacheEventCallback; + + // Constructor. + // Once constructed, the context_impl instanced will manage the lifetime + // of these arguments. + rmw_context_impl_s( + const rcutils_allocator_t * allocator, + const std::size_t domain_id, + const std::string & enclave, + z_owned_session_t session, + std::optional shm_manager, + rmw_guard_condition_t * graph_guard_condition); + + // Get a copy of the enclave. + std::string enclave() const; + + // Loan the Zenoh session. + z_session_t session() const; + + // Get a reference to the shm_manager. + // Note: This is not thread-safe. + // TODO(Yadunund): Remove this API and instead include a publish() API + // that handles the shm_manager once the context manages publishers. + std::optional & shm_manager(); + + // Get the graph guard condition. + rmw_guard_condition_t * graph_guard_condition(); + + // Get a unique id for a new entity. + size_t get_next_entity_id(); + + // Shutdown the Zenoh session. + rmw_ret_t shutdown(); + + // Check if the Zenoh session is shutdown. + bool is_shutdown() const; + + // Returns true if the Zenoh session is valid. + bool session_is_valid() const; + + rmw_ret_t get_node_names( + rcutils_string_array_t * node_names, + rcutils_string_array_t * node_namespaces, + rcutils_string_array_t * enclaves, + rcutils_allocator_t * allocator) const; + + rmw_ret_t get_topic_names_and_types( + rcutils_allocator_t * allocator, + bool no_demangle, + rmw_names_and_types_t * topic_names_and_types) const; + + rmw_ret_t publisher_count_matched_subscriptions( + const rmw_publisher_t * publisher, + size_t * subscription_count); + + rmw_ret_t subscription_count_matched_publishers( + const rmw_subscription_t * subscription, + size_t * publisher_count); + + rmw_ret_t get_service_names_and_types( + rcutils_allocator_t * allocator, + rmw_names_and_types_t * service_names_and_types) const; + + rmw_ret_t count_publishers( + const char * topic_name, + size_t * count) const; + + rmw_ret_t count_subscriptions( + const char * topic_name, + size_t * count) const; + + rmw_ret_t count_services( + const char * service_name, + size_t * count) const; + + rmw_ret_t count_clients( + const char * service_name, + size_t * count) const; + + rmw_ret_t get_entity_names_and_types_by_node( + rmw_zenoh_cpp::liveliness::EntityType entity_type, + rcutils_allocator_t * allocator, + const char * node_name, + const char * node_namespace, + bool no_demangle, + rmw_names_and_types_t * names_and_types) const; + + rmw_ret_t get_entities_info_by_topic( + rmw_zenoh_cpp::liveliness::EntityType entity_type, + rcutils_allocator_t * allocator, + const char * topic_name, + bool no_demangle, + rmw_topic_endpoint_info_array_t * endpoints_info) const; + + rmw_ret_t service_server_is_available( + const char * service_name, + const char * service_type, + bool * is_available) const; + + void set_qos_event_callback( + rmw_zenoh_cpp::liveliness::ConstEntityPtr entity, + const rmw_zenoh_cpp::rmw_zenoh_event_type_t & event_type, + GraphCacheEventCallback callback); + +private: + // Bundle all class members into a data struct which can be passed as a + // weak ptr to various threads for thread-safe access without capturing + // "this" ptr by reference. + struct Data : public std::enable_shared_from_this + { + // Constructor. + Data( + const rcutils_allocator_t * allocator, + const std::size_t domain_id, + const std::string & enclave, + z_owned_session_t session, + std::optional shm_manager, + rmw_guard_condition_t * graph_guard_condition); + + // Subscribe to the ROS graph. + rmw_ret_t subscribe(); + + // Shutdown the Zenoh session. + rmw_ret_t shutdown(); + + // Destructor. + ~Data(); + + // Mutex to lock when accessing members. + mutable std::mutex mutex_; + // RMW allocator. + const rcutils_allocator_t * allocator_; + // Enclave, name used to find security artifacts in a sros2 keystore. + std::string enclave_; + // An owned session. + z_owned_session_t session_; + // An optional SHM manager that is initialized of SHM is enabled in the + // zenoh session config. + std::optional shm_manager_; + // Liveliness keyexpr string to subscribe to for ROS graph changes. + std::string liveliness_str_; + // ROS graph liveliness subscriber. + z_owned_subscriber_t graph_subscriber_; + // Equivalent to rmw_dds_common::Context's guard condition + /// Guard condition that should be triggered when the graph changes. + rmw_guard_condition_t * graph_guard_condition_; + /// Shutdown flag. + bool is_shutdown_; + // A counter to assign a local id for every entity created in this session. + size_t next_entity_id_; + // Graph cache. + std::unique_ptr graph_cache_; + // True once graph subscriber is initialized. + bool is_initialized_; + }; + + std::shared_ptr data_{nullptr}; + + static void graph_sub_data_handler(const z_sample_t * sample, void * data); +}; + + +#endif // DETAIL__RMW_CONTEXT_IMPL_S_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 51f84b33..5beb6edd 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include - #include #include #include @@ -54,12 +52,6 @@ size_t hash_gid(const rmw_request_id_t & request_id) } } // namespace -///============================================================================= -size_t rmw_context_impl_s::get_next_entity_id() -{ - return next_entity_id_++; -} - namespace rmw_zenoh_cpp { ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 60e4bd01..5ed480e7 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -42,38 +42,6 @@ /// Structs for various type erased data fields. -///============================================================================= -class rmw_context_impl_s final -{ -public: - // Enclave, name used to find security artifacts in a sros2 keystore. - char * enclave; - - // An owned session. - z_owned_session_t session; - - // An optional SHM manager that is initialized of SHM is enabled in the - // zenoh session config. - std::optional shm_manager; - - z_owned_subscriber_t graph_subscriber; - - /// Shutdown flag. - bool is_shutdown; - - // Equivalent to rmw_dds_common::Context's guard condition - /// Guard condition that should be triggered when the graph changes. - rmw_guard_condition_t * graph_guard_condition; - - std::unique_ptr graph_cache; - - size_t get_next_entity_id(); - -private: - // A counter to assign a local id for every entity created in this session. - size_t next_entity_id_{0}; -}; - namespace rmw_zenoh_cpp { ///============================================================================= diff --git a/rmw_zenoh_cpp/src/rmw_event.cpp b/rmw_zenoh_cpp/src/rmw_event.cpp index 964118ee..17280be7 100644 --- a/rmw_zenoh_cpp/src/rmw_event.cpp +++ b/rmw_zenoh_cpp/src/rmw_event.cpp @@ -20,6 +20,7 @@ #include "detail/event.hpp" #include "detail/graph_cache.hpp" #include "detail/identifier.hpp" +#include "detail/rmw_context_impl_s.hpp" #include "detail/rmw_data_types.hpp" @@ -41,6 +42,8 @@ rmw_publisher_event_init( static_cast(publisher->data); RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(pub_data->context, RMW_RET_INVALID_ARGUMENT); + rmw_context_impl_t * context_impl = static_cast(pub_data->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(pub_data->entity, RMW_RET_INVALID_ARGUMENT); if (publisher->implementation_identifier != rmw_zenoh_cpp::rmw_zenoh_identifier) { @@ -61,7 +64,7 @@ rmw_publisher_event_init( rmw_event->event_type = event_type; // Register the event with graph cache. - pub_data->context->impl->graph_cache->set_qos_event_callback( + context_impl->set_qos_event_callback( pub_data->entity, zenoh_event_type, [pub_data, @@ -95,6 +98,8 @@ rmw_subscription_event_init( static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data->context, RMW_RET_INVALID_ARGUMENT); + rmw_context_impl_t * context_impl = static_cast(sub_data->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data->entity, RMW_RET_INVALID_ARGUMENT); if (subscription->implementation_identifier != rmw_zenoh_cpp::rmw_zenoh_identifier) { @@ -121,7 +126,7 @@ rmw_subscription_event_init( return RMW_RET_OK; } - sub_data->context->impl->graph_cache->set_qos_event_callback( + sub_data->context->impl->set_qos_event_callback( sub_data->entity, zenoh_event_type, [sub_data, diff --git a/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp index c42295b3..4aadc4d2 100644 --- a/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp @@ -14,7 +14,7 @@ #include "detail/identifier.hpp" #include "detail/liveliness_utils.hpp" -#include "detail/rmw_data_types.hpp" +#include "detail/rmw_context_impl_s.hpp" #include "rcutils/allocator.h" @@ -43,7 +43,9 @@ rmw_get_subscriber_names_and_types_by_node( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->get_entity_names_and_types_by_node( + rmw_context_impl_t * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); + return context_impl->get_entity_names_and_types_by_node( rmw_zenoh_cpp::liveliness::EntityType::Subscription, allocator, node_name, @@ -71,7 +73,9 @@ rmw_get_publisher_names_and_types_by_node( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->get_entity_names_and_types_by_node( + rmw_context_impl_t * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); + return context_impl->get_entity_names_and_types_by_node( rmw_zenoh_cpp::liveliness::EntityType::Publisher, allocator, node_name, @@ -98,7 +102,9 @@ rmw_get_service_names_and_types_by_node( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->get_entity_names_and_types_by_node( + rmw_context_impl_t * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); + return context_impl->get_entity_names_and_types_by_node( rmw_zenoh_cpp::liveliness::EntityType::Service, allocator, node_name, @@ -125,7 +131,9 @@ rmw_get_client_names_and_types_by_node( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->get_entity_names_and_types_by_node( + rmw_context_impl_t * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); + return context_impl->get_entity_names_and_types_by_node( rmw_zenoh_cpp::liveliness::EntityType::Client, allocator, node_name, diff --git a/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp index 12415ebe..5dc7cdf3 100644 --- a/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "detail/rmw_data_types.hpp" +#include "detail/rmw_context_impl_s.hpp" #include "rcutils/allocator.h" @@ -35,8 +35,10 @@ rmw_get_service_names_and_types( RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(service_names_and_types, RMW_RET_INVALID_ARGUMENT); + rmw_context_impl_t * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->get_service_names_and_types( + return context_impl->get_service_names_and_types( allocator, service_names_and_types); } } // extern "C" diff --git a/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp b/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp index a4c4878d..d70ce0c7 100644 --- a/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp @@ -15,7 +15,7 @@ #include "detail/identifier.hpp" #include "detail/liveliness_utils.hpp" -#include "detail/rmw_data_types.hpp" +#include "detail/rmw_context_impl_s.hpp" #include "rcutils/allocator.h" @@ -43,7 +43,9 @@ rmw_get_publishers_info_by_topic( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->get_entities_info_by_topic( + rmw_context_impl_t * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); + return context_impl->get_entities_info_by_topic( rmw_zenoh_cpp::liveliness::EntityType::Publisher, allocator, topic_name, @@ -69,7 +71,9 @@ rmw_get_subscriptions_info_by_topic( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->get_entities_info_by_topic( + rmw_context_impl_t * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); + return context_impl->get_entities_info_by_topic( rmw_zenoh_cpp::liveliness::EntityType::Subscription, allocator, topic_name, diff --git a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp index db7d972c..477dc7d9 100644 --- a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "detail/rmw_data_types.hpp" +#include "detail/rmw_context_impl_s.hpp" #include "rcutils/allocator.h" @@ -37,7 +37,9 @@ rmw_get_topic_names_and_types( RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(topic_names_and_types, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->get_topic_names_and_types( + rmw_context_impl_t * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); + return context_impl->get_topic_names_and_types( allocator, no_demangle, topic_names_and_types); } } // extern "C" diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 99c6f3ad..2556c718 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -21,6 +21,7 @@ #include "detail/guard_condition.hpp" #include "detail/identifier.hpp" #include "detail/liveliness_utils.hpp" +#include "detail/rmw_context_impl_s.hpp" #include "detail/rmw_data_types.hpp" #include "detail/zenoh_config.hpp" #include "detail/zenoh_router_check.hpp" @@ -42,51 +43,6 @@ extern "C" // TODO(clalancette): Make this configurable, or get it from the configuration #define SHM_BUFFER_SIZE_MB 10 -namespace -{ -void -graph_sub_data_handler(const z_sample_t * sample, void * data) -{ - static_cast(data); - - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - auto free_keystr = rcpputils::make_scope_exit( - [&keystr]() { - z_drop(z_move(keystr)); - }); - - // Get the context impl from data. - rmw_context_impl_s * context_impl = static_cast( - data); - if (context_impl == nullptr) { - RMW_ZENOH_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "[graph_sub_data_handler] Unable to convert data into context_impl" - ); - return; - } - - switch (sample->kind) { - case z_sample_kind_t::Z_SAMPLE_KIND_PUT: - context_impl->graph_cache->parse_put(keystr._cstr); - break; - case z_sample_kind_t::Z_SAMPLE_KIND_DELETE: - context_impl->graph_cache->parse_del(keystr._cstr); - break; - default: - return; - } - - rmw_ret_t rmw_ret = rmw_trigger_guard_condition(context_impl->graph_guard_condition); - if (RMW_RET_OK != rmw_ret) { - RMW_ZENOH_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "[graph_sub_data_handler] Unable to trigger graph guard condition" - ); - } -} -} // namespace - //============================================================================== /// Initialize the middleware with the given options, and yielding an context. rmw_ret_t @@ -123,24 +79,6 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) const rcutils_allocator_t * allocator = &options->allocator; - context->impl = static_cast( - allocator->zero_allocate(1, sizeof(rmw_context_impl_t), allocator->state)); - RMW_CHECK_FOR_NULL_WITH_MSG( - context->impl, - "failed to allocate context impl", - return RMW_RET_BAD_ALLOC); - auto free_impl = rcpputils::make_scope_exit( - [context, allocator]() { - allocator->deallocate(context->impl, allocator->state); - }); - - RMW_TRY_PLACEMENT_NEW(context->impl, context->impl, return RMW_RET_BAD_ALLOC, rmw_context_impl_t); - auto impl_destructor = rcpputils::make_scope_exit( - [context] { - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - context->impl->~rmw_context_impl_t(), rmw_context_impl_t *); - }); - rmw_ret_t ret; if ((ret = rmw_init_options_copy(options, &context->options)) != RMW_RET_OK) { // error already set @@ -154,20 +92,6 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) } }); - // Set the enclave. - context->impl->enclave = rcutils_strdup(options->enclave, *allocator); - RMW_CHECK_FOR_NULL_WITH_MSG( - context->impl->enclave, - "failed to allocate enclave", - return RMW_RET_BAD_ALLOC); - auto free_enclave = rcpputils::make_scope_exit( - [context, allocator]() { - allocator->deallocate(context->impl->enclave, allocator->state); - }); - - // Initialize context's implementation - context->impl->is_shutdown = false; - // If not already defined, set the logging environment variable for Zenoh sessions // to warning level by default. // TODO(Yadunund): Switch to rcutils_get_env once it supports not overwriting values. @@ -189,26 +113,23 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) // Check if shm is enabled. z_owned_str_t shm_enabled = zc_config_get(z_loan(config), "transport/shared_memory/enabled"); - auto free_shm_enabled = rcpputils::make_scope_exit( + auto always_free_shm_enabled = rcpputils::make_scope_exit( [&shm_enabled]() { z_drop(z_move(shm_enabled)); }); // Initialize the zenoh session. - context->impl->session = z_open(z_move(config)); - if (!z_session_check(&context->impl->session)) { + z_owned_session_t session = z_open(z_move(config)); + if (!z_session_check(&session)) { RMW_SET_ERROR_MSG("Error setting up zenoh session"); return RMW_RET_ERROR; } auto close_session = rcpputils::make_scope_exit( - [context]() { - z_close(z_move(context->impl->session)); + [&session]() { + z_close(z_move(session)); }); - /// Initialize the graph cache. - z_id_t zid = z_info_zid(z_loan(context->impl->session)); - context->impl->graph_cache = std::make_unique(zid); - + // TODO(Yadunund) Move this check into a separate thread. // Verify if the zenoh router is running if configured. const std::optional configured_connection_attempts = rmw_zenoh_cpp::zenoh_router_check_attempts(); @@ -217,7 +138,7 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) uint64_t connection_attempts = 0; // Retry until the connection is successful. while (ret != RMW_RET_OK && connection_attempts < configured_connection_attempts.value()) { - if ((ret = rmw_zenoh_cpp::zenoh_router_check(z_loan(context->impl->session))) != RMW_RET_OK) { + if ((ret = rmw_zenoh_cpp::zenoh_router_check(z_loan(session))) != RMW_RET_OK) { ++connection_attempts; } std::this_thread::sleep_for(std::chrono::seconds(1)); @@ -231,6 +152,8 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) } // Initialize the shm manager if shared_memory is enabled in the config. + std::optional shm_manager = std::nullopt; + const z_id_t zid = z_info_zid(z_loan(session)); if (shm_enabled._cstr != nullptr && strcmp(shm_enabled._cstr, "true") == 0) { @@ -242,148 +165,94 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) idstr[sizeof(zid.id) * 2] = '\0'; // TODO(yadunund): Can we get the size of the shm from the config even though it's not // a standard parameter? - context->impl->shm_manager = + shm_manager = zc_shm_manager_new( - z_loan(context->impl->session), + z_loan(session), idstr, SHM_BUFFER_SIZE_MB * 1024 * 1024); - if (!context->impl->shm_manager.has_value() || - !zc_shm_manager_check(&context->impl->shm_manager.value())) + if (!shm_manager.has_value() || + !zc_shm_manager_check(&shm_manager.value())) { RMW_SET_ERROR_MSG("Unable to create shm manager."); return RMW_RET_ERROR; } } auto free_shm_manager = rcpputils::make_scope_exit( - [context]() { - if (context->impl->shm_manager.has_value()) { - z_drop(z_move(context->impl->shm_manager.value())); + [&shm_manager]() { + if (shm_manager.has_value()) { + z_drop(z_move(shm_manager.value())); } }); // Initialize the guard condition. - context->impl->graph_guard_condition = + rmw_guard_condition_t * graph_guard_condition = static_cast(allocator->zero_allocate( 1, sizeof(rmw_guard_condition_t), allocator->state)); RMW_CHECK_FOR_NULL_WITH_MSG( - context->impl->graph_guard_condition, + graph_guard_condition, "failed to allocate graph guard condition", return RMW_RET_BAD_ALLOC); auto free_guard_condition = rcpputils::make_scope_exit( - [context, allocator]() { - allocator->deallocate(context->impl->graph_guard_condition, allocator->state); + [graph_guard_condition, allocator]() { + allocator->deallocate(graph_guard_condition, allocator->state); }); - - context->impl->graph_guard_condition->implementation_identifier = + graph_guard_condition->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - - context->impl->graph_guard_condition->data = + graph_guard_condition->data = allocator->zero_allocate(1, sizeof(rmw_zenoh_cpp::GuardCondition), allocator->state); RMW_CHECK_FOR_NULL_WITH_MSG( - context->impl->graph_guard_condition->data, + graph_guard_condition->data, "failed to allocate graph guard condition data", return RMW_RET_BAD_ALLOC); auto free_guard_condition_data = rcpputils::make_scope_exit( - [context, allocator]() { - allocator->deallocate(context->impl->graph_guard_condition->data, allocator->state); + [graph_guard_condition, allocator]() { + allocator->deallocate(graph_guard_condition->data, allocator->state); }); - RMW_TRY_PLACEMENT_NEW( - context->impl->graph_guard_condition->data, - context->impl->graph_guard_condition->data, + graph_guard_condition->data, + graph_guard_condition->data, return RMW_RET_BAD_ALLOC, rmw_zenoh_cpp::GuardCondition); auto destruct_guard_condition_data = rcpputils::make_scope_exit( - [context]() { + [graph_guard_condition]() { auto gc_data = - static_cast(context->impl->graph_guard_condition->data); + static_cast(graph_guard_condition->data); RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( gc_data->~GuardCondition(), rmw_zenoh_cpp::GuardCondition); }); - // Setup liveliness subscriptions for discovery. - const std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token( - context->actual_domain_id); - - // Query router/liveliness participants to get graph information before this session was started. - - // We create a blocking channel that is unbounded, ie. `bound` = 0, to receive - // replies for the zc_liveliness_get() call. This is necessary as if the `bound` - // is too low, the channel may starve the zenoh executor of its threads which - // would lead to deadlocks when trying to receive replies and block the - // execution here. - // The blocking channel will return when the sender end is closed which is - // the moment the query finishes. - // The non-blocking fifo exists only for the use case where we don't want to - // block the thread between responses (including the request termination response). - // In general, unless we want to cooperatively schedule other tasks on the same - // thread as reading the fifo, the blocking fifo will be more appropriate as - // the code will be simpler, and if we're just going to spin over the non-blocking - // reads until we obtain responses, we'll just be hogging CPU time by convincing - // the OS that we're doing actual work when it could instead park the thread. - z_owned_reply_channel_t channel = zc_reply_fifo_new(0); - zc_liveliness_get( - z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), - z_move(channel.send), NULL); - z_owned_reply_t reply = z_reply_null(); - for (bool call_success = z_call(channel.recv, &reply); !call_success || z_check(reply); - call_success = z_call(channel.recv, &reply)) - { - if (!call_success) { - continue; - } - if (z_reply_is_ok(&reply)) { - z_sample_t sample = z_reply_ok(&reply); - z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); - // Ignore tokens from the same session to avoid race conditions from this - // query and the liveliness subscription. - context->impl->graph_cache->parse_put(z_loan(keystr), true); - z_drop(z_move(keystr)); - } else { - printf("[discovery] Received an error\n"); - } - } - z_drop(z_move(reply)); - z_drop(z_move(channel)); - - // TODO(Yadunund): Switch this to a liveliness subscriptions once the API is available. - - // Uncomment and rely on #if #endif blocks to enable this feature when building with - // zenoh-pico since liveliness is only available in zenoh-c. - // auto sub_options = z_subscriber_options_default(); - // sub_options.reliability = Z_RELIABILITY_RELIABLE; - // context->impl->graph_subscriber = z_declare_subscriber( - // z_loan(context->impl->session), - // z_keyexpr(liveliness_str.c_str()), - // z_move(callback), - // &sub_options); - auto sub_options = zc_liveliness_subscriber_options_null(); - z_owned_closure_sample_t callback = z_closure(graph_sub_data_handler, nullptr, context->impl); - context->impl->graph_subscriber = zc_liveliness_declare_subscriber( - z_loan(context->impl->session), - z_keyexpr(liveliness_str.c_str()), - z_move(callback), - &sub_options); - zc_liveliness_subscriber_options_drop(z_move(sub_options)); - auto undeclare_z_sub = rcpputils::make_scope_exit( - [context]() { - z_undeclare_subscriber(z_move(context->impl->graph_subscriber)); + + // Create the context impl. + context->impl = static_cast( + allocator->zero_allocate(1, sizeof(rmw_context_impl_t), allocator->state)); + RMW_CHECK_FOR_NULL_WITH_MSG( + context->impl, + "failed to allocate context impl", + return RMW_RET_BAD_ALLOC); + auto free_impl = rcpputils::make_scope_exit( + [context, allocator]() { + allocator->deallocate(context->impl, allocator->state); }); - if (!z_check(context->impl->graph_subscriber)) { - RMW_SET_ERROR_MSG("unable to create zenoh subscription"); - return RMW_RET_ERROR; - } - undeclare_z_sub.cancel(); + RMW_TRY_PLACEMENT_NEW( + context->impl, + context->impl, + return RMW_RET_BAD_ALLOC, + rmw_context_impl_t, + allocator, + context->actual_domain_id, + std::string(options->enclave), + std::move(session), + std::move(shm_manager), + graph_guard_condition + ); + close_session.cancel(); destruct_guard_condition_data.cancel(); - impl_destructor.cancel(); free_guard_condition_data.cancel(); free_guard_condition.cancel(); - free_enclave.cancel(); free_options.cancel(); - impl_destructor.cancel(); free_impl.cancel(); free_shm_manager.cancel(); restore_context.cancel(); @@ -407,19 +276,10 @@ rmw_shutdown(rmw_context_t * context) rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - z_undeclare_subscriber(z_move(context->impl->graph_subscriber)); - if (context->impl->shm_manager.has_value()) { - z_drop(z_move(context->impl->shm_manager.value())); - } - // Close the zenoh session - if (z_close(z_move(context->impl->session)) < 0) { - RMW_SET_ERROR_MSG("Error while closing zenoh session"); - return RMW_RET_ERROR; - } - - context->impl->is_shutdown = true; + rmw_context_impl_t * context_impl = static_cast(context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - return RMW_RET_OK; + return context_impl->shutdown(); } //============================================================================== @@ -437,24 +297,13 @@ rmw_context_fini(rmw_context_t * context) context->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - if (!context->impl->is_shutdown) { + if (!context->impl->is_shutdown()) { RCUTILS_SET_ERROR_MSG("context has not been shutdown"); return RMW_RET_INVALID_ARGUMENT; } const rcutils_allocator_t * allocator = &context->options.allocator; - RMW_TRY_DESTRUCTOR( - static_cast( - context->impl->graph_guard_condition->data)->~GuardCondition(), - rmw_zenoh_cpp::GuardCondition, ); - allocator->deallocate(context->impl->graph_guard_condition->data, allocator->state); - - allocator->deallocate(context->impl->graph_guard_condition, allocator->state); - context->impl->graph_guard_condition = nullptr; - - allocator->deallocate(context->impl->enclave, allocator->state); - RMW_TRY_DESTRUCTOR(context->impl->~rmw_context_impl_t(), rmw_context_impl_t *, ); allocator->deallocate(context->impl, allocator->state); diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 3de285a5..4678ff08 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -36,6 +36,7 @@ #include "detail/logging_macros.hpp" #include "detail/message_type_support.hpp" #include "detail/qos.hpp" +#include "detail/rmw_context_impl_s.hpp" #include "detail/rmw_data_types.hpp" #include "detail/serialization_format.hpp" #include "detail/type_support_common.hpp" @@ -210,11 +211,7 @@ rmw_create_node( context->impl, "expected initialized context", return nullptr); - RMW_CHECK_FOR_NULL_WITH_MSG( - context->impl->enclave, - "expected initialized enclave", - return nullptr); - if (context->impl->is_shutdown) { + if (context->impl->is_shutdown()) { RCUTILS_SET_ERROR_MSG("context has been shutdown"); return nullptr; } @@ -297,13 +294,14 @@ rmw_create_node( // Initialize liveliness token for the node to advertise that a new node is in town. node_data->id = context->impl->get_next_entity_id(); + z_session_t session = context->impl->session(); node_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( - z_info_zid(z_loan(context->impl->session)), + z_info_zid(session), std::to_string(node_data->id), std::to_string(node_data->id), rmw_zenoh_cpp::liveliness::EntityType::Node, rmw_zenoh_cpp::liveliness::NodeInfo{context->actual_domain_id, namespace_, name, - context->impl->enclave}); + context->impl->enclave()}); if (node_data->entity == nullptr) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -311,7 +309,7 @@ rmw_create_node( return nullptr; } node_data->token = zc_liveliness_declare_token( - z_loan(context->impl->session), + session, z_keyexpr(node_data->entity->keyexpr().c_str()), NULL ); @@ -388,7 +386,7 @@ rmw_node_get_graph_guard_condition(const rmw_node_t * node) RMW_CHECK_ARGUMENT_FOR_NULL(node->context, nullptr); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, nullptr); - return node->context->impl->graph_guard_condition; + return node->context->impl->graph_guard_condition(); } //============================================================================== @@ -501,11 +499,7 @@ rmw_create_publisher( context_impl, "unable to get rmw_context_impl_s", return nullptr); - RMW_CHECK_FOR_NULL_WITH_MSG( - context_impl->enclave, - "expected initialized enclave", - return nullptr); - if (!z_check(context_impl->session)) { + if (!context_impl->session_is_valid()) { RMW_SET_ERROR_MSG("zenoh session is invalid"); return nullptr; } @@ -628,13 +622,14 @@ rmw_create_publisher( return nullptr; } + z_session_t session = context_impl->session(); // Create a Publication Cache if durability is transient_local. if (publisher_data->adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { ze_publication_cache_options_t pub_cache_opts = ze_publication_cache_options_default(); pub_cache_opts.history = publisher_data->adapted_qos_profile.depth; pub_cache_opts.queryable_complete = true; publisher_data->pub_cache = ze_declare_publication_cache( - z_loan(context_impl->session), + session, z_loan(keyexpr), &pub_cache_opts ); @@ -660,7 +655,7 @@ rmw_create_publisher( } // TODO(clalancette): What happens if the key name is a valid but empty string? publisher_data->pub = z_declare_publisher( - z_loan(context_impl->session), + session, z_loan(keyexpr), &opts ); @@ -674,13 +669,13 @@ rmw_create_publisher( }); publisher_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( - z_info_zid(z_loan(node->context->impl->session)), + z_info_zid(session), std::to_string(node_data->id), std::to_string( context_impl->get_next_entity_id()), rmw_zenoh_cpp::liveliness::EntityType::Publisher, rmw_zenoh_cpp::liveliness::NodeInfo{ - node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave}, + node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave()}, rmw_zenoh_cpp::liveliness::TopicInfo{ rmw_publisher->topic_name, publisher_data->type_support->get_name(), @@ -694,7 +689,7 @@ rmw_create_publisher( return nullptr; } publisher_data->token = zc_liveliness_declare_token( - z_loan(node->context->impl->session), + session, z_keyexpr(publisher_data->entity->keyexpr().c_str()), NULL ); @@ -932,15 +927,15 @@ rmw_publish( }); // Get memory from SHM buffer if available. - if (publisher_data->context->impl->shm_manager.has_value() && - zc_shm_manager_check(&publisher_data->context->impl->shm_manager.value())) + if (publisher_data->context->impl->shm_manager().has_value() && + zc_shm_manager_check(&publisher_data->context->impl->shm_manager().value())) { shmbuf = zc_shm_alloc( - &publisher_data->context->impl->shm_manager.value(), + &publisher_data->context->impl->shm_manager().value(), max_data_length); if (!z_check(shmbuf.value())) { - zc_shm_gc(&publisher_data->context->impl->shm_manager.value()); - shmbuf = zc_shm_alloc(&publisher_data->context->impl->shm_manager.value(), max_data_length); + zc_shm_gc(&publisher_data->context->impl->shm_manager().value()); + shmbuf = zc_shm_alloc(&publisher_data->context->impl->shm_manager().value(), max_data_length); if (!z_check(shmbuf.value())) { // TODO(Yadunund): Should we revert to regular allocation and not return an error? RMW_SET_ERROR_MSG("Failed to allocate a SHM buffer, even after GCing"); @@ -1048,7 +1043,7 @@ rmw_publisher_count_matched_subscriptions( rmw_context_impl_t * context_impl = static_cast(pub_data->context->impl); RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - return context_impl->graph_cache->publisher_count_matched_subscriptions( + return context_impl->publisher_count_matched_subscriptions( publisher, subscription_count); } @@ -1332,11 +1327,7 @@ rmw_create_subscription( context_impl, "unable to get rmw_context_impl_s", return nullptr); - RMW_CHECK_FOR_NULL_WITH_MSG( - context_impl->enclave, - "expected initialized enclave", - return nullptr); - if (!z_check(context_impl->session)) { + if (!context_impl->session_is_valid()) { RMW_SET_ERROR_MSG("zenoh session is invalid"); return nullptr; } @@ -1465,6 +1456,7 @@ rmw_create_subscription( // adapted_qos_profile. // TODO(Yadunund): Rely on a separate function to return the sub // as we start supporting more qos settings. + z_session_t session = context_impl->session(); z_owned_str_t owned_key_str = z_keyexpr_to_string(z_loan(keyexpr)); auto always_drop_keystr = rcpputils::make_scope_exit( [&owned_key_str]() { @@ -1483,7 +1475,7 @@ rmw_create_subscription( sub_options.reliability = Z_RELIABILITY_RELIABLE; } sub_data->sub = ze_declare_querying_subscriber( - z_loan(context_impl->session), + session, z_loan(keyexpr), z_move(callback), &sub_options @@ -1499,7 +1491,7 @@ rmw_create_subscription( sub_options.reliability = Z_RELIABILITY_RELIABLE; } sub_data->sub = z_declare_subscriber( - z_loan(context_impl->session), + session, z_loan(keyexpr), z_move(callback), &sub_options @@ -1526,13 +1518,13 @@ rmw_create_subscription( // Publish to the graph that a new subscription is in town sub_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( - z_info_zid(z_loan(node->context->impl->session)), + z_info_zid(session), std::to_string(node_data->id), std::to_string( context_impl->get_next_entity_id()), rmw_zenoh_cpp::liveliness::EntityType::Subscription, rmw_zenoh_cpp::liveliness::NodeInfo{ - node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave}, + node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave()}, rmw_zenoh_cpp::liveliness::TopicInfo{ rmw_subscription->topic_name, sub_data->type_support->get_name(), @@ -1546,7 +1538,7 @@ rmw_create_subscription( return nullptr; } sub_data->token = zc_liveliness_declare_token( - z_loan(context_impl->session), + session, z_keyexpr(sub_data->entity->keyexpr().c_str()), NULL ); @@ -1654,7 +1646,7 @@ rmw_subscription_count_matched_publishers( rmw_context_impl_t * context_impl = static_cast(sub_data->context->impl); RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - return context_impl->graph_cache->subscription_count_matched_publishers( + return context_impl->subscription_count_matched_publishers( subscription, publisher_count); } @@ -1867,7 +1859,7 @@ rmw_take_sequence( auto sub_data = static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - if (sub_data->context->impl->is_shutdown) { + if (sub_data->context->impl->is_shutdown()) { return RMW_RET_OK; } @@ -1927,7 +1919,7 @@ __rmw_take_serialized( auto sub_data = static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - if (sub_data->context->impl->is_shutdown) { + if (sub_data->context->impl->is_shutdown()) { return RMW_RET_OK; } @@ -2082,11 +2074,7 @@ rmw_create_client( context_impl, "unable to get rmw_context_impl_s", return nullptr); - RMW_CHECK_FOR_NULL_WITH_MSG( - context_impl->enclave, - "expected initialized enclave", - return nullptr); - if (!z_check(context_impl->session)) { + if (!context_impl->session_is_valid()) { RMW_SET_ERROR_MSG("zenoh session is invalid"); return nullptr; } @@ -2285,14 +2273,15 @@ rmw_create_client( return nullptr; } + z_session_t session = context_impl->session(); client_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( - z_info_zid(z_loan(node->context->impl->session)), + z_info_zid(session), std::to_string(node_data->id), std::to_string( context_impl->get_next_entity_id()), rmw_zenoh_cpp::liveliness::EntityType::Client, rmw_zenoh_cpp::liveliness::NodeInfo{ - node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave}, + node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave()}, rmw_zenoh_cpp::liveliness::TopicInfo{ rmw_client->service_name, std::move(service_type), @@ -2306,7 +2295,7 @@ rmw_create_client( return nullptr; } client_data->token = zc_liveliness_declare_token( - z_loan(node->context->impl->session), + session, z_keyexpr(client_data->entity->keyexpr().c_str()), NULL ); @@ -2497,7 +2486,7 @@ rmw_send_request( z_owned_closure_reply_t zn_closure_reply = z_closure(rmw_zenoh_cpp::client_data_handler, rmw_zenoh_cpp::client_data_drop, client_data); z_get( - z_loan(context_impl->session), + context_impl->session(), z_loan(client_data->keyexpr), "", z_move(zn_closure_reply), &opts); @@ -2686,11 +2675,7 @@ rmw_create_service( context_impl, "unable to get rmw_context_impl_s", return nullptr); - RMW_CHECK_FOR_NULL_WITH_MSG( - context_impl->enclave, - "expected initialized enclave", - return nullptr); - if (!z_check(context_impl->session)) { + if (!context_impl->session_is_valid()) { RMW_SET_ERROR_MSG("zenoh session is invalid"); return nullptr; } @@ -2865,6 +2850,7 @@ rmw_create_service( return nullptr; } + z_session_t session = context_impl->session(); z_owned_closure_query_t callback = z_closure( rmw_zenoh_cpp::service_data_handler, nullptr, service_data); @@ -2872,7 +2858,7 @@ rmw_create_service( z_queryable_options_t qable_options = z_queryable_options_default(); qable_options.complete = true; service_data->qable = z_declare_queryable( - z_loan(context_impl->session), + session, z_loan(service_data->keyexpr), z_move(callback), &qable_options); @@ -2887,13 +2873,13 @@ rmw_create_service( }); service_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( - z_info_zid(z_loan(node->context->impl->session)), + z_info_zid(session), std::to_string(node_data->id), std::to_string( context_impl->get_next_entity_id()), rmw_zenoh_cpp::liveliness::EntityType::Service, rmw_zenoh_cpp::liveliness::NodeInfo{ - node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave}, + node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave()}, rmw_zenoh_cpp::liveliness::TopicInfo{ rmw_service->service_name, std::move(service_type), @@ -2907,7 +2893,7 @@ rmw_create_service( return nullptr; } service_data->token = zc_liveliness_declare_token( - z_loan(node->context->impl->session), + session, z_keyexpr(service_data->entity->keyexpr().c_str()), NULL ); @@ -3658,7 +3644,7 @@ rmw_get_node_names( rcutils_allocator_t * allocator = &node->context->options.allocator; RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->get_node_names( + return node->context->impl->get_node_names( node_names, node_namespaces, nullptr, allocator); } @@ -3681,7 +3667,7 @@ rmw_get_node_names_with_enclaves( rcutils_allocator_t * allocator = &node->context->options.allocator; RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->get_node_names( + return node->context->impl->get_node_names( node_names, node_namespaces, enclaves, allocator); } @@ -3712,7 +3698,7 @@ rmw_count_publishers( } RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->count_publishers(topic_name, count); + return node->context->impl->count_publishers(topic_name, count); } //============================================================================== @@ -3742,7 +3728,7 @@ rmw_count_subscribers( } RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->count_subscriptions(topic_name, count); + return node->context->impl->count_subscriptions(topic_name, count); } //============================================================================== @@ -3772,7 +3758,7 @@ rmw_count_clients( } RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->count_clients(service_name, count); + return node->context->impl->count_clients(service_name, count); } //============================================================================== @@ -3802,7 +3788,7 @@ rmw_count_services( } RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->count_services(service_name, count); + return node->context->impl->count_services(service_name, count); } //============================================================================== @@ -3902,7 +3888,7 @@ rmw_service_server_is_available( return RMW_RET_INVALID_ARGUMENT; } - return node->context->impl->graph_cache->service_server_is_available( + return node->context->impl->service_server_is_available( client->service_name, service_type.c_str(), is_available); }