diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index ed7fa87b..374af78e 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -14,391 +14,531 @@ #include "rmw_context_impl_s.hpp" +#include +#include +#include +#include #include #include +#include #include #include +#include #include +#include "graph_cache.hpp" #include "guard_condition.hpp" #include "identifier.hpp" -#include "liveliness_utils.hpp" #include "logging_macros.hpp" +#include "rmw_node_data.hpp" #include "zenoh_config.hpp" #include "zenoh_router_check.hpp" #include "rcpputils/scope_exit.hpp" #include "rmw/error_handling.h" -#include "rmw/impl/cpp/macros.hpp" // Megabytes of SHM to reserve. // TODO(clalancette): Make this configurable, or get it from the configuration #define SHM_BUFFER_SIZE_MB 10 -///============================================================================= -void rmw_context_impl_s::graph_sub_data_handler(const z_sample_t * sample, void * data) +// This global mapping of raw Data pointers to Data shared pointers allows graph_sub_data_handler() +// to lookup the pointer, and gain a reference to a shared_ptr if it exists. +// This guarantees that the Data object will not be destroyed while we are using it. +static std::mutex data_to_data_shared_ptr_map_mutex; +static std::unordered_map> data_to_data_shared_ptr_map; + +static void graph_sub_data_handler(const z_sample_t * sample, void * data); + +// Bundle all class members into a data struct which can be passed as a +// weak ptr to various threads for thread-safe access without capturing +// "this" ptr by reference. +class rmw_context_impl_s::Data final { - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - auto free_keystr = rcpputils::make_scope_exit( - [&keystr]() { - z_drop(z_move(keystr)); - }); +public: + // Constructor. + Data( + std::size_t domain_id, + const std::string & enclave) + : domain_id_(std::move(domain_id)), + enclave_(std::move(enclave)), + is_shutdown_(false), + next_entity_id_(0), + nodes_({}) + { + // Initialize the zenoh configuration. + z_owned_config_t config; + rmw_ret_t ret; + if ((ret = + rmw_zenoh_cpp::get_z_config( + rmw_zenoh_cpp::ConfigurableEntity::Session, + &config)) != RMW_RET_OK) + { + throw std::runtime_error("Error configuring Zenoh session."); + } - auto data_ptr = static_cast(data); - if (data_ptr == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "[graph_sub_data_handler] Invalid data_ptr." - ); - return; - } + // Check if shm is enabled. + z_owned_str_t shm_enabled = zc_config_get(z_loan(config), "transport/shared_memory/enabled"); + auto always_free_shm_enabled = rcpputils::make_scope_exit( + [&shm_enabled]() { + z_drop(z_move(shm_enabled)); + }); + + // Initialize the zenoh session. + session_ = z_open(z_move(config)); + if (!z_session_check(&session_)) { + throw std::runtime_error("Error setting up zenoh session."); + } + auto close_session = rcpputils::make_scope_exit( + [this]() { + z_close(z_move(session_)); + }); + + // TODO(Yadunund) Move this check into a separate thread. + // Verify if the zenoh router is running if configured. + const std::optional configured_connection_attempts = + rmw_zenoh_cpp::zenoh_router_check_attempts(); + if (configured_connection_attempts.has_value()) { + ret = RMW_RET_ERROR; + uint64_t connection_attempts = 0; + // Retry until the connection is successful. + while (ret != RMW_RET_OK && connection_attempts < configured_connection_attempts.value()) { + if ((ret = rmw_zenoh_cpp::zenoh_router_check(z_loan(session_))) != RMW_RET_OK) { + ++connection_attempts; + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + if (ret != RMW_RET_OK) { + throw std::runtime_error( + "Unable to connect to a Zenoh router after " + + std::to_string(configured_connection_attempts.value()) + + " retries."); + } + } - // Update the graph cache. - std::lock_guard lock(data_ptr->mutex_); - if (data_ptr->is_shutdown_) { - return; - } - switch (sample->kind) { - case z_sample_kind_t::Z_SAMPLE_KIND_PUT: - data_ptr->graph_cache_->parse_put(keystr._cstr); - break; - case z_sample_kind_t::Z_SAMPLE_KIND_DELETE: - data_ptr->graph_cache_->parse_del(keystr._cstr); - break; - default: - return; - } + // Initialize the graph cache. + const z_id_t zid = z_info_zid(z_loan(session_)); + graph_cache_ = std::make_shared(zid); + // Setup liveliness subscriptions for discovery. + std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token(domain_id); + + // Query router/liveliness participants to get graph information before the session was started. + // We create a blocking channel that is unbounded, ie. `bound` = 0, to receive + // replies for the zc_liveliness_get() call. This is necessary as if the `bound` + // is too low, the channel may starve the zenoh executor of its threads which + // would lead to deadlocks when trying to receive replies and block the + // execution here. + // The blocking channel will return when the sender end is closed which is + // the moment the query finishes. + // The non-blocking fifo exists only for the use case where we don't want to + // block the thread between responses (including the request termination response). + // In general, unless we want to cooperatively schedule other tasks on the same + // thread as reading the fifo, the blocking fifo will be more appropriate as + // the code will be simpler, and if we're just going to spin over the non-blocking + // reads until we obtain responses, we'll just be hogging CPU time by convincing + // the OS that we're doing actual work when it could instead park the thread. + z_owned_reply_channel_t channel = zc_reply_fifo_new(0); + zc_liveliness_get( + z_loan(session_), z_keyexpr(liveliness_str.c_str()), + z_move(channel.send), NULL); + z_owned_reply_t reply = z_reply_null(); + for (bool call_success = z_call(channel.recv, &reply); !call_success || z_check(reply); + call_success = z_call(channel.recv, &reply)) + { + if (!call_success) { + continue; + } + if (z_reply_is_ok(&reply)) { + z_sample_t sample = z_reply_ok(&reply); + z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); + // Ignore tokens from the same session to avoid race conditions from this + // query and the liveliness subscription. + graph_cache_->parse_put(z_loan(keystr), true); + z_drop(z_move(keystr)); + } else { + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", "[rmw_context_impl_s] z_call received an invalid reply.\n"); + } + } + z_drop(z_move(reply)); + z_drop(z_move(channel)); - // Trigger the ROS graph guard condition. - rmw_ret_t rmw_ret = rmw_trigger_guard_condition(data_ptr->graph_guard_condition_.get()); - if (RMW_RET_OK != rmw_ret) { - RMW_ZENOH_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "[graph_sub_data_handler] Unable to trigger graph guard condition." - ); + // Initialize the shm manager if shared_memory is enabled in the config. + shm_manager_ = std::nullopt; + if (shm_enabled._cstr != nullptr && + strcmp(shm_enabled._cstr, "true") == 0) + { + char idstr[sizeof(zid.id) * 2 + 1]; // 2 bytes for each byte of the id, plus the trailing \0 + static constexpr size_t max_size_of_each = 3; // 2 for each byte, plus the trailing \0 + for (size_t i = 0; i < sizeof(zid.id); ++i) { + snprintf(idstr + 2 * i, max_size_of_each, "%02x", zid.id[i]); + } + idstr[sizeof(zid.id) * 2] = '\0'; + // TODO(yadunund): Can we get the size of the shm from the config even though it's not + // a standard parameter? + shm_manager_ = + zc_shm_manager_new( + z_loan(session_), + idstr, + SHM_BUFFER_SIZE_MB * 1024 * 1024); + if (!shm_manager_.has_value() || + !zc_shm_manager_check(&shm_manager_.value())) + { + throw std::runtime_error("Unable to create shm manager."); + } + } + auto free_shm_manager = rcpputils::make_scope_exit( + [this]() { + if (shm_manager_.has_value()) { + z_drop(z_move(shm_manager_.value())); + } + }); + + graph_guard_condition_ = std::make_unique(); + graph_guard_condition_->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; + graph_guard_condition_->data = &guard_condition_data_; + + // Setup the liveliness subscriber to receives updates from the ROS graph + // and update the graph cache. + auto sub_options = zc_liveliness_subscriber_options_null(); + z_owned_closure_sample_t callback = z_closure( + graph_sub_data_handler, nullptr, this); + graph_subscriber_ = zc_liveliness_declare_subscriber( + z_loan(session_), + z_keyexpr(liveliness_str.c_str()), + z_move(callback), + &sub_options); + zc_liveliness_subscriber_options_drop(z_move(sub_options)); + auto undeclare_z_sub = rcpputils::make_scope_exit( + [this]() { + z_undeclare_subscriber(z_move(this->graph_subscriber_)); + }); + if (!z_check(graph_subscriber_)) { + RMW_SET_ERROR_MSG("unable to create zenoh subscription"); + throw std::runtime_error("Unable to subscribe to ROS graph updates."); + } + + close_session.cancel(); + free_shm_manager.cancel(); + undeclare_z_sub.cancel(); } -} -///============================================================================= -rmw_context_impl_s::Data::Data( - std::size_t domain_id, - const std::string & enclave, - z_owned_session_t session, - std::optional shm_manager, - const std::string & liveliness_str, - std::shared_ptr graph_cache) -: enclave_(std::move(enclave)), - domain_id_(std::move(domain_id)), - session_(std::move(session)), - shm_manager_(std::move(shm_manager)), - liveliness_str_(std::move(liveliness_str)), - graph_cache_(std::move(graph_cache)), - is_shutdown_(false), - next_entity_id_(0), - is_initialized_(false), - nodes_({}) -{ - graph_guard_condition_ = std::make_unique(); - graph_guard_condition_->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - graph_guard_condition_->data = &guard_condition_data_; -} + // Shutdown the Zenoh session. + rmw_ret_t shutdown() + { + { + std::lock_guard lock(mutex_); + rmw_ret_t ret = RMW_RET_OK; + if (is_shutdown_) { + return ret; + } -///============================================================================= -rmw_ret_t rmw_context_impl_s::Data::subscribe_to_ros_graph() -{ - std::lock_guard lock(mutex_); - if (is_initialized_) { + // Shutdown all the nodes in this context. + for (auto node_it = nodes_.begin(); node_it != nodes_.end(); ++node_it) { + ret = node_it->second->shutdown(); + if (ret != RMW_RET_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to shutdown node with id %zu. rmw_ret_t code: %zu.", + node_it->second->id(), + ret + ); + } + } + + z_undeclare_subscriber(z_move(graph_subscriber_)); + if (shm_manager_.has_value()) { + z_drop(z_move(shm_manager_.value())); + } + is_shutdown_ = true; + + // We specifically do *not* hold the mutex_ while tearing down the session; this allows us + // to avoid an AB/BA deadlock if shutdown is racing with graph_sub_data_handler(). + } + + // Close the zenoh session + if (z_close(z_move(session_)) < 0) { + RMW_SET_ERROR_MSG("Error while closing zenoh session"); + return RMW_RET_ERROR; + } return RMW_RET_OK; } - // Setup the liveliness subscriber to receives updates from the ROS graph - // and update the graph cache. - // TODO(Yadunund): This closure is still not 100% thread safe as we are - // passing Data* as the type erased argument to z_closure. Thus during - // the execution of graph_sub_data_handler, the rawptr may be freed/reset - // by a different thread. When we switch to zenoh-cpp we can replace z_closure - // with a lambda that captures a weak_ptr by copy. The lambda and caputed - // weak_ptr will have the same lifetime as the subscriber. Then within - // graph_sub_data_handler, we would first lock to weak_ptr to check if the - // shared_ptr exits. If it does, then even if a different thread calls - // rmw_context_fini() to destroy rmw_context_impl_s, the locked - // shared_ptr would live on until the graph_sub_data_handler callback. - auto sub_options = zc_liveliness_subscriber_options_null(); - z_owned_closure_sample_t callback = z_closure( - rmw_context_impl_s::graph_sub_data_handler, nullptr, this); - graph_subscriber_ = zc_liveliness_declare_subscriber( - z_loan(session_), - z_keyexpr(liveliness_str_.c_str()), - z_move(callback), - &sub_options); - zc_liveliness_subscriber_options_drop(z_move(sub_options)); - auto undeclare_z_sub = rcpputils::make_scope_exit( - [this]() { - z_undeclare_subscriber(z_move(this->graph_subscriber_)); - }); - if (!z_check(graph_subscriber_)) { - RMW_SET_ERROR_MSG("unable to create zenoh subscription"); - return RMW_RET_ERROR; + + std::string enclave() const + { + std::lock_guard lock(mutex_); + return enclave_; } - undeclare_z_sub.cancel(); - is_initialized_ = true; - return RMW_RET_OK; -} + z_session_t session() const + { + std::lock_guard lock(mutex_); + return z_loan(session_); + } -///============================================================================= -rmw_ret_t rmw_context_impl_s::Data::shutdown() -{ - std::lock_guard lock(mutex_); - rmw_ret_t ret = RMW_RET_OK; - if (is_shutdown_) { - return ret; + std::optional & shm_manager() + { + std::lock_guard lock(mutex_); + return shm_manager_; } - // Shutdown all the nodes in this context. - for (auto node_it = nodes_.begin(); node_it != nodes_.end(); ++node_it) { - ret = node_it->second->shutdown(); - if (ret != RMW_RET_OK) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to shutdown node with id %zu. rmw_ret_t code: %zu.", - node_it->second->id(), - ret - ); - } + rmw_guard_condition_t * graph_guard_condition() + { + std::lock_guard lock(mutex_); + return graph_guard_condition_.get(); } - z_undeclare_subscriber(z_move(graph_subscriber_)); - if (shm_manager_.has_value()) { - z_drop(z_move(shm_manager_.value())); + std::size_t get_next_entity_id() + { + std::lock_guard lock(mutex_); + return next_entity_id_++; } - // Close the zenoh session - if (z_close(z_move(session_)) < 0) { - RMW_SET_ERROR_MSG("Error while closing zenoh session"); - return RMW_RET_ERROR; + + bool is_shutdown() const + { + std::lock_guard lock(mutex_); + return is_shutdown_; } - is_shutdown_ = true; - return RMW_RET_OK; -} -///============================================================================= -rmw_context_impl_s::Data::~Data() -{ - auto ret = this->shutdown(); - nodes_.clear(); - static_cast(ret); -} + bool session_is_valid() const + { + std::lock_guard lock(mutex_); + return z_check(session_); + } -///============================================================================= -rmw_context_impl_s::rmw_context_impl_s( - const std::size_t domain_id, - const std::string & enclave) -{ - // Initialize the zenoh configuration. - z_owned_config_t config; - rmw_ret_t ret; - if ((ret = - rmw_zenoh_cpp::get_z_config( - rmw_zenoh_cpp::ConfigurableEntity::Session, - &config)) != RMW_RET_OK) + std::shared_ptr graph_cache() { - throw std::runtime_error("Error configuring Zenoh session."); + std::lock_guard lock(mutex_); + return graph_cache_; } - // Check if shm is enabled. - z_owned_str_t shm_enabled = zc_config_get(z_loan(config), "transport/shared_memory/enabled"); - auto always_free_shm_enabled = rcpputils::make_scope_exit( - [&shm_enabled]() { - z_drop(z_move(shm_enabled)); - }); + bool create_node_data( + const rmw_node_t * const node, + const std::string & ns, + const std::string & node_name) + { + std::lock_guard lock(mutex_); + if (nodes_.count(node) > 0) { + // Node already exists. + return false; + } - // Initialize the zenoh session. - z_owned_session_t session = z_open(z_move(config)); - if (!z_session_check(&session)) { - throw std::runtime_error("Error setting up zenoh session."); - } - auto close_session = rcpputils::make_scope_exit( - [&session]() { - z_close(z_move(session)); - }); + // Check that the Zenoh session is still valid. + if (!z_check(session_)) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create NodeData as Zenoh session is invalid."); + return false; + } - // TODO(Yadunund) Move this check into a separate thread. - // Verify if the zenoh router is running if configured. - const std::optional configured_connection_attempts = - rmw_zenoh_cpp::zenoh_router_check_attempts(); - if (configured_connection_attempts.has_value()) { - ret = RMW_RET_ERROR; - uint64_t connection_attempts = 0; - // Retry until the connection is successful. - while (ret != RMW_RET_OK && connection_attempts < configured_connection_attempts.value()) { - if ((ret = rmw_zenoh_cpp::zenoh_router_check(z_loan(session))) != RMW_RET_OK) { - ++connection_attempts; - } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + auto node_data = rmw_zenoh_cpp::NodeData::make( + node, + this->get_next_entity_id(), + z_loan(session_), + domain_id_, + ns, + node_name, + enclave_); + if (node_data == nullptr) { + // Error already handled. + return false; } - if (ret != RMW_RET_OK) { - throw std::runtime_error( - "Unable to connect to a Zenoh router after " + - std::to_string(configured_connection_attempts.value()) + - " retries."); + + auto node_insertion = nodes_.insert(std::make_pair(node, std::move(node_data))); + if (!node_insertion.second) { + return false; } + + return true; } - // Initialize the graph cache. - const z_id_t zid = z_info_zid(z_loan(session)); - auto graph_cache = std::make_shared(zid); - // Setup liveliness subscriptions for discovery. - std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token( - domain_id); - - // Query router/liveliness participants to get graph information before this session was started. - // We create a blocking channel that is unbounded, ie. `bound` = 0, to receive - // replies for the zc_liveliness_get() call. This is necessary as if the `bound` - // is too low, the channel may starve the zenoh executor of its threads which - // would lead to deadlocks when trying to receive replies and block the - // execution here. - // The blocking channel will return when the sender end is closed which is - // the moment the query finishes. - // The non-blocking fifo exists only for the use case where we don't want to - // block the thread between responses (including the request termination response). - // In general, unless we want to cooperatively schedule other tasks on the same - // thread as reading the fifo, the blocking fifo will be more appropriate as - // the code will be simpler, and if we're just going to spin over the non-blocking - // reads until we obtain responses, we'll just be hogging CPU time by convincing - // the OS that we're doing actual work when it could instead park the thread. - z_owned_reply_channel_t channel = zc_reply_fifo_new(0); - zc_liveliness_get( - z_loan(session), z_keyexpr(liveliness_str.c_str()), - z_move(channel.send), NULL); - z_owned_reply_t reply = z_reply_null(); - for (bool call_success = z_call(channel.recv, &reply); !call_success || z_check(reply); - call_success = z_call(channel.recv, &reply)) + std::shared_ptr get_node_data(const rmw_node_t * const node) { - if (!call_success) { - continue; - } - if (z_reply_is_ok(&reply)) { - z_sample_t sample = z_reply_ok(&reply); - z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); - // Ignore tokens from the same session to avoid race conditions from this - // query and the liveliness subscription. - graph_cache->parse_put(z_loan(keystr), true); - z_drop(z_move(keystr)); - } else { - RMW_ZENOH_LOG_DEBUG_NAMED( - "rmw_zenoh_cpp", "[rmw_context_impl_s] z_call received an invalid reply.\n"); + std::lock_guard lock(mutex_); + auto node_it = nodes_.find(node); + if (node_it == nodes_.end()) { + return nullptr; } + return node_it->second; } - z_drop(z_move(reply)); - z_drop(z_move(channel)); - // Initialize the shm manager if shared_memory is enabled in the config. - std::optional shm_manager = std::nullopt; - if (shm_enabled._cstr != nullptr && - strcmp(shm_enabled._cstr, "true") == 0) + void delete_node_data(const rmw_node_t * const node) { - char idstr[sizeof(zid.id) * 2 + 1]; // 2 bytes for each byte of the id, plus the trailing \0 - static constexpr size_t max_size_of_each = 3; // 2 for each byte, plus the trailing \0 - for (size_t i = 0; i < sizeof(zid.id); ++i) { - snprintf(idstr + 2 * i, max_size_of_each, "%02x", zid.id[i]); + std::lock_guard lock(mutex_); + nodes_.erase(node); + } + + void update_graph_cache(z_sample_kind_t sample_kind, const std::string & keystr) + { + std::lock_guard lock(mutex_); + if (is_shutdown_) { + return; } - idstr[sizeof(zid.id) * 2] = '\0'; - // TODO(yadunund): Can we get the size of the shm from the config even though it's not - // a standard parameter? - shm_manager = - zc_shm_manager_new( - z_loan(session), - idstr, - SHM_BUFFER_SIZE_MB * 1024 * 1024); - if (!shm_manager.has_value() || - !zc_shm_manager_check(&shm_manager.value())) - { - throw std::runtime_error("Unable to create shm manager."); + switch (sample_kind) { + case z_sample_kind_t::Z_SAMPLE_KIND_PUT: + graph_cache_->parse_put(keystr); + break; + case z_sample_kind_t::Z_SAMPLE_KIND_DELETE: + graph_cache_->parse_del(keystr); + break; + default: + return; + } + + // Trigger the ROS graph guard condition. + rmw_ret_t rmw_ret = rmw_trigger_guard_condition(graph_guard_condition_.get()); + if (RMW_RET_OK != rmw_ret) { + RMW_ZENOH_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "[graph_sub_data_handler] Unable to trigger graph guard condition." + ); } } - auto free_shm_manager = rcpputils::make_scope_exit( - [&shm_manager]() { - if (shm_manager.has_value()) { - z_drop(z_move(shm_manager.value())); - } - }); - close_session.cancel(); - free_shm_manager.cancel(); + // Destructor. + ~Data() + { + auto ret = this->shutdown(); + nodes_.clear(); + static_cast(ret); + } + +private: + // Mutex to lock when accessing members. + mutable std::recursive_mutex mutex_; + // The ROS domain id of this context. + std::size_t domain_id_; + // Enclave, name used to find security artifacts in a sros2 keystore. + std::string enclave_; + // An owned session. + z_owned_session_t session_; + // An optional SHM manager that is initialized of SHM is enabled in the + // zenoh session config. + std::optional shm_manager_; + // Graph cache. + std::shared_ptr graph_cache_; + // ROS graph liveliness subscriber. + z_owned_subscriber_t graph_subscriber_; + // Equivalent to rmw_dds_common::Context's guard condition. + // Guard condition that should be triggered when the graph changes. + std::unique_ptr graph_guard_condition_; + // The GuardCondition data structure. + rmw_zenoh_cpp::GuardCondition guard_condition_data_; + // Shutdown flag. + bool is_shutdown_; + // A counter to assign a local id for every entity created in this session. + std::size_t next_entity_id_; + // Nodes created from this context. + std::unordered_map> nodes_; +}; + +///============================================================================= +static void graph_sub_data_handler(const z_sample_t * sample, void * data) +{ + z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); + auto free_keystr = rcpputils::make_scope_exit( + [&keystr]() { + z_drop(z_move(keystr)); + }); - data_ = std::make_shared( - domain_id, - std::move(enclave), - std::move(session), - std::move(shm_manager), - std::move(liveliness_str), - std::move(graph_cache)); + auto data_ptr = static_cast(data); + if (data_ptr == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "[graph_sub_data_handler] Invalid data_ptr." + ); + return; + } - ret = data_->subscribe_to_ros_graph(); - if (ret != RMW_RET_OK) { - throw std::runtime_error("Unable to subscribe to ROS Graph updates."); + // Look up the data shared_ptr in the global map. If it is in there, use it. + // If not, it is being shutdown so we can just ignore this update. + std::shared_ptr data_shared_ptr{nullptr}; + { + std::lock_guard lk(data_to_data_shared_ptr_map_mutex); + if (data_to_data_shared_ptr_map.count(data_ptr) == 0) { + return; + } + data_shared_ptr = data_to_data_shared_ptr_map[data_ptr]; } + + // Update the graph cache. + data_shared_ptr->update_graph_cache(sample->kind, keystr._cstr); +} + +///============================================================================= +rmw_context_impl_s::rmw_context_impl_s( + const std::size_t domain_id, + const std::string & enclave) +{ + data_ = std::make_shared(domain_id, std::move(enclave)); + + std::lock_guard lk(data_to_data_shared_ptr_map_mutex); + data_to_data_shared_ptr_map.emplace(data_.get(), data_); +} + +///============================================================================= +rmw_context_impl_s::~rmw_context_impl_s() +{ + this->shutdown(); } ///============================================================================= std::string rmw_context_impl_s::enclave() const { - std::lock_guard lock(data_->mutex_); - return data_->enclave_; + return data_->enclave(); } ///============================================================================= z_session_t rmw_context_impl_s::session() const { - std::lock_guard lock(data_->mutex_); - return z_loan(data_->session_); + return data_->session(); } ///============================================================================= std::optional & rmw_context_impl_s::shm_manager() { - std::lock_guard lock(data_->mutex_); - return data_->shm_manager_; + return data_->shm_manager(); } ///============================================================================= rmw_guard_condition_t * rmw_context_impl_s::graph_guard_condition() { - std::lock_guard lock(data_->mutex_); - return data_->graph_guard_condition_.get(); + return data_->graph_guard_condition(); } ///============================================================================= std::size_t rmw_context_impl_s::get_next_entity_id() { - std::lock_guard lock(data_->mutex_); - return data_->next_entity_id_++; + return data_->get_next_entity_id(); } ///============================================================================= rmw_ret_t rmw_context_impl_s::shutdown() { + { + std::lock_guard lk(data_to_data_shared_ptr_map_mutex); + data_to_data_shared_ptr_map.erase(data_.get()); + } + return data_->shutdown(); } ///============================================================================= bool rmw_context_impl_s::is_shutdown() const { - std::lock_guard lock(data_->mutex_); - return data_->is_shutdown_; + return data_->is_shutdown(); } ///============================================================================= bool rmw_context_impl_s::session_is_valid() const { - std::lock_guard lock(data_->mutex_); - return z_check(data_->session_); + return data_->session_is_valid(); } ///============================================================================= std::shared_ptr rmw_context_impl_s::graph_cache() { - std::lock_guard lock(data_->mutex_); - return data_->graph_cache_; + return data_->graph_cache(); } ///============================================================================= @@ -407,56 +547,18 @@ bool rmw_context_impl_s::create_node_data( const std::string & ns, const std::string & node_name) { - std::lock_guard lock(data_->mutex_); - if (data_->nodes_.count(node) > 0) { - // Node already exists. - return false; - } - - // Check that the Zenoh session is still valid. - if (!z_check(data_->session_)) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to create NodeData as Zenoh session is invalid."); - return false; - } - - auto node_data = rmw_zenoh_cpp::NodeData::make( - node, - this->get_next_entity_id(), - z_loan(data_->session_), - data_->domain_id_, - ns, - node_name, - data_->enclave_); - if (node_data == nullptr) { - // Error already handled. - return false; - } - - auto node_insertion = data_->nodes_.insert(std::make_pair(node, std::move(node_data))); - if (!node_insertion.second) { - return false; - } - - return true; + return data_->create_node_data(node, ns, node_name); } ///============================================================================= std::shared_ptr rmw_context_impl_s::get_node_data( const rmw_node_t * const node) { - std::lock_guard lock(data_->mutex_); - auto node_it = data_->nodes_.find(node); - if (node_it == data_->nodes_.end()) { - return nullptr; - } - return node_it->second; + return data_->get_node_data(node); } ///============================================================================= void rmw_context_impl_s::delete_node_data(const rmw_node_t * const node) { - std::lock_guard lock(data_->mutex_); - data_->nodes_.erase(node); + data_->delete_node_data(node); } diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index 884056e3..a35b9ffe 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -17,20 +17,16 @@ #include -# include +#include #include -#include #include #include -#include #include "graph_cache.hpp" -#include "guard_condition.hpp" -#include "liveliness_utils.hpp" #include "rmw_node_data.hpp" -#include "rcutils/types.h" -#include "rmw/rmw.h" +#include "rmw/ret_types.h" +#include "rmw/types.h" ///============================================================================= class rmw_context_impl_s final @@ -46,6 +42,8 @@ class rmw_context_impl_s final const std::size_t domain_id, const std::string & enclave); + ~rmw_context_impl_s(); + // Get a copy of the enclave. std::string enclave() const; @@ -93,67 +91,11 @@ class rmw_context_impl_s final /// Delete the NodeData for a given rmw_node_t if present. void delete_node_data(const rmw_node_t * const node); -private: - // Bundle all class members into a data struct which can be passed as a - // weak ptr to various threads for thread-safe access without capturing - // "this" ptr by reference. - struct Data : public std::enable_shared_from_this - { - // Constructor. - Data( - std::size_t domain_id, - const std::string & enclave, - z_owned_session_t session, - std::optional shm_manager, - const std::string & liveliness_str, - std::shared_ptr graph_cache); - - // Subscribe to the ROS graph. - rmw_ret_t subscribe_to_ros_graph(); - - // Shutdown the Zenoh session. - rmw_ret_t shutdown(); - - // Destructor. - ~Data(); - - // Mutex to lock when accessing members. - mutable std::recursive_mutex mutex_; - // RMW allocator. - const rcutils_allocator_t * allocator_; - // Enclave, name used to find security artifacts in a sros2 keystore. - std::string enclave_; - // The ROS domain id of this context. - std::size_t domain_id_; - // An owned session. - z_owned_session_t session_; - // An optional SHM manager that is initialized of SHM is enabled in the - // zenoh session config. - std::optional shm_manager_; - // Liveliness keyexpr string to subscribe to for ROS graph changes. - std::string liveliness_str_; - // Graph cache. - std::shared_ptr graph_cache_; - // ROS graph liveliness subscriber. - z_owned_subscriber_t graph_subscriber_; - // Equivalent to rmw_dds_common::Context's guard condition. - // Guard condition that should be triggered when the graph changes. - std::unique_ptr graph_guard_condition_; - // The GuardCondition data structure. - rmw_zenoh_cpp::GuardCondition guard_condition_data_; - // Shutdown flag. - bool is_shutdown_; - // A counter to assign a local id for every entity created in this session. - std::size_t next_entity_id_; - // True once graph subscriber is initialized. - bool is_initialized_; - // Nodes created from this context. - std::unordered_map> nodes_; - }; + // Forward declaration + class Data; +private: std::shared_ptr data_{nullptr}; - - static void graph_sub_data_handler(const z_sample_t * sample, void * data); };