diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index c548a6a7..95aab3d6 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -40,6 +40,7 @@ add_library(rmw_zenoh_cpp SHARED src/detail/qos.cpp src/detail/rmw_context_impl_s.cpp src/detail/rmw_data_types.cpp + src/detail/rmw_node_data.cpp src/detail/service_type_support.cpp src/detail/type_support.cpp src/detail/type_support_common.cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index d60bece8..875f7c09 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -54,7 +54,7 @@ void rmw_context_impl_s::graph_sub_data_handler(const z_sample_t * sample, void } // Update the graph cache. - std::lock_guard lock(data_ptr->mutex_); + std::lock_guard lock(data_ptr->mutex_); if (data_ptr->is_shutdown_) { return; } @@ -81,19 +81,22 @@ void rmw_context_impl_s::graph_sub_data_handler(const z_sample_t * sample, void ///============================================================================= rmw_context_impl_s::Data::Data( + std::size_t domain_id, const std::string & enclave, z_owned_session_t session, std::optional shm_manager, const std::string & liveliness_str, std::shared_ptr graph_cache) : enclave_(std::move(enclave)), + domain_id_(std::move(domain_id)), session_(std::move(session)), shm_manager_(std::move(shm_manager)), liveliness_str_(std::move(liveliness_str)), graph_cache_(std::move(graph_cache)), is_shutdown_(false), next_entity_id_(0), - is_initialized_(false) + is_initialized_(false), + nodes_({}) { graph_guard_condition_ = std::make_unique(); graph_guard_condition_->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; @@ -103,7 +106,7 @@ rmw_context_impl_s::Data::Data( ///============================================================================= rmw_ret_t rmw_context_impl_s::Data::subscribe_to_ros_graph() { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); if (is_initialized_) { return RMW_RET_OK; } @@ -145,7 +148,7 @@ rmw_ret_t rmw_context_impl_s::Data::subscribe_to_ros_graph() ///============================================================================= rmw_ret_t rmw_context_impl_s::Data::shutdown() { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); if (is_shutdown_) { return RMW_RET_OK; } @@ -308,6 +311,7 @@ rmw_context_impl_s::rmw_context_impl_s( free_shm_manager.cancel(); data_ = std::make_shared( + domain_id, std::move(enclave), std::move(session), std::move(shm_manager), @@ -323,35 +327,35 @@ rmw_context_impl_s::rmw_context_impl_s( ///============================================================================= std::string rmw_context_impl_s::enclave() const { - std::lock_guard lock(data_->mutex_); + std::lock_guard lock(data_->mutex_); return data_->enclave_; } ///============================================================================= z_session_t rmw_context_impl_s::session() const { - std::lock_guard lock(data_->mutex_); + std::lock_guard lock(data_->mutex_); return z_loan(data_->session_); } ///============================================================================= std::optional & rmw_context_impl_s::shm_manager() { - std::lock_guard lock(data_->mutex_); + std::lock_guard lock(data_->mutex_); return data_->shm_manager_; } ///============================================================================= rmw_guard_condition_t * rmw_context_impl_s::graph_guard_condition() { - std::lock_guard lock(data_->mutex_); + std::lock_guard lock(data_->mutex_); return data_->graph_guard_condition_.get(); } ///============================================================================= -size_t rmw_context_impl_s::get_next_entity_id() +std::size_t rmw_context_impl_s::get_next_entity_id() { - std::lock_guard lock(data_->mutex_); + std::lock_guard lock(data_->mutex_); return data_->next_entity_id_++; } @@ -364,20 +368,79 @@ rmw_ret_t rmw_context_impl_s::shutdown() ///============================================================================= bool rmw_context_impl_s::is_shutdown() const { - std::lock_guard lock(data_->mutex_); + std::lock_guard lock(data_->mutex_); return data_->is_shutdown_; } ///============================================================================= bool rmw_context_impl_s::session_is_valid() const { - std::lock_guard lock(data_->mutex_); + std::lock_guard lock(data_->mutex_); return z_check(data_->session_); } ///============================================================================= std::shared_ptr rmw_context_impl_s::graph_cache() { - std::lock_guard lock(data_->mutex_); + std::lock_guard lock(data_->mutex_); return data_->graph_cache_; } + +///============================================================================= +bool rmw_context_impl_s::create_node_data( + const rmw_node_t * const node, + const std::string & ns, + const std::string & node_name) +{ + std::lock_guard lock(data_->mutex_); + if (data_->nodes_.count(node) > 0) { + // Node already exists. + return false; + } + + // Check that the Zenoh session is still valid. + if (!z_check(data_->session_)) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create NodeData as Zenoh session is invalid."); + return false; + } + + auto node_data = rmw_zenoh_cpp::NodeData::make( + this->get_next_entity_id(), + z_loan(data_->session_), + data_->domain_id_, + ns, + node_name, + data_->enclave_); + if (node_data == nullptr) { + // Error already handled. + return false; + } + + auto node_insertion = data_->nodes_.insert(std::make_pair(node, std::move(node_data))); + if (!node_insertion.second) { + return false; + } + + return true; +} + +///============================================================================= +std::shared_ptr rmw_context_impl_s::get_node_data( + const rmw_node_t * const node) +{ + std::lock_guard lock(data_->mutex_); + auto node_it = data_->nodes_.find(node); + if (node_it == data_->nodes_.end()) { + return nullptr; + } + return node_it->second; +} + +///============================================================================= +void rmw_context_impl_s::delete_node_data(const rmw_node_t * const node) +{ + std::lock_guard lock(data_->mutex_); + data_->nodes_.erase(node); +} diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index 3cb6f474..884056e3 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -22,10 +22,12 @@ #include #include #include +#include #include "graph_cache.hpp" #include "guard_condition.hpp" #include "liveliness_utils.hpp" +#include "rmw_node_data.hpp" #include "rcutils/types.h" #include "rmw/rmw.h" @@ -34,7 +36,7 @@ class rmw_context_impl_s final { public: - // Constructor that internally initializees the Zenoh session and other artifacts. + // Constructor that internally initializes the Zenoh session and other artifacts. // Throws an std::runtime_error if any of the initializations fail. // The construction will block until a Zenoh router is detected. // TODO(Yadunund): Make this a non-blocking call by checking for the Zenoh @@ -62,7 +64,7 @@ class rmw_context_impl_s final rmw_guard_condition_t * graph_guard_condition(); // Get a unique id for a new entity. - size_t get_next_entity_id(); + std::size_t get_next_entity_id(); // Shutdown the Zenoh session. rmw_ret_t shutdown(); @@ -76,6 +78,21 @@ class rmw_context_impl_s final /// Return a shared_ptr to the GraphCache stored in this context. std::shared_ptr graph_cache(); + /// Create a NodeData and store it within this context. The NodeData can be + /// retrieved using get_node(). + /// Returns false if parameters are invalid. + bool create_node_data( + const rmw_node_t * const node, + const std::string & ns, + const std::string & node_name); + + /// Retrieve the NodeData for a given rmw_node_t if present. + std::shared_ptr get_node_data( + const rmw_node_t * const node); + + /// Delete the NodeData for a given rmw_node_t if present. + void delete_node_data(const rmw_node_t * const node); + private: // Bundle all class members into a data struct which can be passed as a // weak ptr to various threads for thread-safe access without capturing @@ -84,6 +101,7 @@ class rmw_context_impl_s final { // Constructor. Data( + std::size_t domain_id, const std::string & enclave, z_owned_session_t session, std::optional shm_manager, @@ -100,11 +118,13 @@ class rmw_context_impl_s final ~Data(); // Mutex to lock when accessing members. - mutable std::mutex mutex_; + mutable std::recursive_mutex mutex_; // RMW allocator. const rcutils_allocator_t * allocator_; // Enclave, name used to find security artifacts in a sros2 keystore. std::string enclave_; + // The ROS domain id of this context. + std::size_t domain_id_; // An owned session. z_owned_session_t session_; // An optional SHM manager that is initialized of SHM is enabled in the @@ -124,9 +144,11 @@ class rmw_context_impl_s final // Shutdown flag. bool is_shutdown_; // A counter to assign a local id for every entity created in this session. - size_t next_entity_id_; + std::size_t next_entity_id_; // True once graph subscriber is initialized. bool is_initialized_; + // Nodes created from this context. + std::unordered_map> nodes_; }; std::shared_ptr data_{nullptr}; diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 5ed480e7..e5097988 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -44,20 +44,6 @@ namespace rmw_zenoh_cpp { -///============================================================================= -struct rmw_node_data_t -{ - // The Entity generated for the node. - std::shared_ptr entity; - - // Liveliness token for the node. - zc_owned_liveliness_token_t token; - - // The entity id of this node as generated by get_next_entity_id(). - // Every interface created by this node will include this id in its liveliness token. - size_t id; -}; - ///============================================================================= class rmw_publisher_data_t final { diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp new file mode 100644 index 00000000..d80bd275 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -0,0 +1,108 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rmw_node_data.hpp" + +#include +#include +#include +#include +#include + +#include "logging_macros.hpp" + +#include "rcpputils/scope_exit.hpp" + +namespace rmw_zenoh_cpp +{ +///============================================================================= +std::shared_ptr NodeData::make( + std::size_t id, + z_session_t session, + std::size_t domain_id, + const std::string & namespace_, + const std::string & node_name, + const std::string & enclave) +{ + // Create the entity. + auto entity = rmw_zenoh_cpp::liveliness::Entity::make( + z_info_zid(session), + std::to_string(id), + std::to_string(id), + rmw_zenoh_cpp::liveliness::EntityType::Node, + rmw_zenoh_cpp::liveliness::NodeInfo{ + domain_id, + namespace_, + node_name, + enclave + } + ); + if (entity == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to make NodeData as node entity is invalid."); + return nullptr; + } + + // Create the liveliness token. + zc_owned_liveliness_token_t token = zc_liveliness_declare_token( + session, + z_keyexpr(entity->liveliness_keyexpr().c_str()), + NULL + ); + auto free_token = rcpputils::make_scope_exit( + [&token]() { + z_drop(z_move(token)); + }); + if (!z_check(token)) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create liveliness token for the node."); + return nullptr; + } + free_token.cancel(); + + return std::shared_ptr( + new NodeData{ + id, + std::move(entity), + std::move(token) + }); +} + +///============================================================================= +NodeData::NodeData( + std::size_t id, + std::shared_ptr entity, + zc_owned_liveliness_token_t token) +: id_(std::move(id)), + entity_(std::move(entity)), + token_(std::move(token)) +{ + // Do nothing. +} + +///============================================================================= +NodeData::~NodeData() +{ + zc_liveliness_undeclare_token(z_move(token_)); +} + +///============================================================================= +std::size_t NodeData::id() const +{ + std::lock_guard lock(mutex_); + return id_; +} +} // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp new file mode 100644 index 00000000..8d11c34f --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp @@ -0,0 +1,67 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__RMW_NODE_DATA_HPP_ +#define DETAIL__RMW_NODE_DATA_HPP_ + +#include + +#include +#include +#include +#include + +#include "liveliness_utils.hpp" + +namespace rmw_zenoh_cpp +{ +///============================================================================= +// The NodeData can only be created via rmw_context_impl_s::create_node_data(). +class NodeData final +{ +public: + // Make a shared_ptr of NodeData. Returns nullptr if construction fails. + static std::shared_ptr make( + std::size_t id, + z_session_t session, + std::size_t domain_id, + const std::string & namespace_, + const std::string & node_name, + const std::string & enclave); + + // Get the id of this node. + std::size_t id() const; + + // Destructor. + ~NodeData(); + +private: + // Constructor. + NodeData( + std::size_t id, + std::shared_ptr entity, + zc_owned_liveliness_token_t token); + // Internal mutex. + mutable std::mutex mutex_; + // The entity id of this node as generated by get_next_entity_id(). + // Every interface created by this node will include this id in its liveliness token. + std::size_t id_; + // The Entity generated for the node. + std::shared_ptr entity_; + // Liveliness token for the node. + zc_owned_liveliness_token_t token_; +}; +} // namespace rmw_zenoh_cpp + +#endif // DETAIL__RMW_NODE_DATA_HPP_ diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 7bffef42..aa445ef4 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -167,7 +167,12 @@ rmw_create_node( context->impl, "expected initialized context", return nullptr); - if (context->impl->is_shutdown()) { + rmw_context_impl_t * context_impl = static_cast(context->impl); + RMW_CHECK_FOR_NULL_WITH_MSG( + context_impl, + "expected initialized context_impl", + return nullptr); + if (context_impl->is_shutdown()) { RCUTILS_SET_ERROR_MSG("context has been shutdown"); return nullptr; } @@ -227,67 +232,18 @@ rmw_create_node( allocator->deallocate(const_cast(node->namespace_), allocator->state); }); - // Put metadata into node->data. - auto node_data = static_cast( - allocator->allocate(sizeof(rmw_zenoh_cpp::rmw_node_data_t), allocator->state)); - RMW_CHECK_FOR_NULL_WITH_MSG( - node_data, - "failed to allocate memory for node data", - return nullptr); - auto free_node_data = rcpputils::make_scope_exit( - [node_data, allocator]() { - allocator->deallocate(node_data, allocator->state); - }); - - RMW_TRY_PLACEMENT_NEW( - node_data, node_data, return nullptr, - rmw_zenoh_cpp::rmw_node_data_t); - auto destruct_node_data = rcpputils::make_scope_exit( - [node_data]() { - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - node_data->~rmw_node_data_t(), rmw_zenoh_cpp::rmw_node_data_t); - }); - - // Initialize liveliness token for the node to advertise that a new node is in town. - node_data->id = context->impl->get_next_entity_id(); - z_session_t session = context->impl->session(); - node_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( - z_info_zid(session), - std::to_string(node_data->id), - std::to_string(node_data->id), - rmw_zenoh_cpp::liveliness::EntityType::Node, - rmw_zenoh_cpp::liveliness::NodeInfo{context->actual_domain_id, namespace_, name, - context->impl->enclave()}); - if (node_data->entity == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to generate keyexpr for liveliness token for the node %s.", - name); - return nullptr; - } - node_data->token = zc_liveliness_declare_token( - session, - z_keyexpr(node_data->entity->liveliness_keyexpr().c_str()), - NULL - ); - auto free_token = rcpputils::make_scope_exit( - [node_data]() { - z_drop(z_move(node_data->token)); - }); - if (!z_check(node_data->token)) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to create liveliness token for the node."); + // Create the NodeData for this node. + if (!context_impl->create_node_data(node, namespace_, name)) { + // Error already set. return nullptr; } node->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; node->context = context; - node->data = node_data; + // Store type erased rmw_context_impl_s in node->data so that the NodeData + // can be safely accessed. + node->data = context->impl; - free_token.cancel(); - free_node_data.cancel(); - destruct_node_data.cancel(); free_namespace.cancel(); free_name.cancel(); free_node.cancel(); @@ -313,14 +269,15 @@ rmw_destroy_node(rmw_node_t * node) // Undeclare liveliness token for the node to advertise that the node has ridden // off into the sunset. - rmw_zenoh_cpp::rmw_node_data_t * node_data = - static_cast(node->data); - if (node_data != nullptr) { - zc_liveliness_undeclare_token(z_move(node_data->token)); - RMW_TRY_DESTRUCTOR(node_data->~rmw_node_data_t(), rmw_node_data_t, ); - allocator->deallocate(node_data, allocator->state); + rmw_context_impl_s * context_impl = + static_cast(node->data); + if (context_impl == nullptr) { + RMW_SET_ERROR_MSG("Unable to cast node->data into rmw_context_impl_s."); + return RMW_RET_ERROR; } + context_impl->delete_node_data(node); + allocator->deallocate(const_cast(node->namespace_), allocator->state); allocator->deallocate(const_cast(node->name), allocator->state); allocator->deallocate(node, allocator->state); @@ -431,9 +388,6 @@ rmw_create_publisher( "Strict requirement on unique network flow endpoints for publishers not supported"); return nullptr; } - const rmw_zenoh_cpp::rmw_node_data_t * node_data = - static_cast(node->data); - RMW_CHECK_ARGUMENT_FOR_NULL(node_data, nullptr); // Get the RMW type support. const rosidl_message_type_support_t * type_support = find_message_type_support(type_supports); @@ -567,10 +521,14 @@ rmw_create_publisher( z_session_t session = context_impl->session(); const z_id_t zid = z_info_zid(session); - + auto node_data = context_impl->get_node_data(node); + RMW_CHECK_FOR_NULL_WITH_MSG( + node_data, + "NodeData not found.", + return nullptr); publisher_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( zid, - std::to_string(node_data->id), + std::to_string(node_data->id()), std::to_string( context_impl->get_next_entity_id()), rmw_zenoh_cpp::liveliness::EntityType::Publisher, @@ -1285,10 +1243,6 @@ rmw_create_subscription( } RMW_CHECK_ARGUMENT_FOR_NULL(node->data, nullptr); - auto node_data = static_cast(node->data); - RMW_CHECK_FOR_NULL_WITH_MSG( - node_data, "unable to create subscription as node_data is invalid.", - return nullptr); // TODO(yadunund): Check if a duplicate entry for the same topic name + topic type // is present in node_data->subscriptions and if so return error; RMW_CHECK_FOR_NULL_WITH_MSG( @@ -1415,12 +1369,16 @@ rmw_create_subscription( }); z_session_t session = context_impl->session(); - + auto node_data = context_impl->get_node_data(node); + RMW_CHECK_FOR_NULL_WITH_MSG( + node_data, + "NodeData not found.", + return nullptr); // Everything above succeeded and is setup properly. Now declare a subscriber // with Zenoh; after this, callbacks may come in at any time. sub_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( z_info_zid(session), - std::to_string(node_data->id), + std::to_string(node_data->id()), std::to_string( context_impl->get_next_entity_id()), rmw_zenoh_cpp::liveliness::EntityType::Subscription, @@ -2106,14 +2064,6 @@ rmw_create_client( RMW_SET_ERROR_MSG("zenoh session is invalid"); return nullptr; } - RMW_CHECK_ARGUMENT_FOR_NULL(node->data, nullptr); - const rmw_zenoh_cpp::rmw_node_data_t * node_data = - static_cast(node->data); - if (node_data == nullptr) { - RMW_SET_ERROR_MSG( - "Unable to create client as node data is invalid."); - return nullptr; - } rcutils_allocator_t * allocator = &node->context->options.allocator; @@ -2288,9 +2238,14 @@ rmw_create_client( }); z_session_t session = context_impl->session(); + auto node_data = context_impl->get_node_data(node); + RMW_CHECK_FOR_NULL_WITH_MSG( + node_data, + "NodeData not found.", + return nullptr); client_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( z_info_zid(session), - std::to_string(node_data->id), + std::to_string(node_data->id()), std::to_string( context_impl->get_next_entity_id()), rmw_zenoh_cpp::liveliness::EntityType::Client, @@ -2680,14 +2635,6 @@ rmw_create_service( return nullptr; } } - RMW_CHECK_ARGUMENT_FOR_NULL(node->data, nullptr); - const rmw_zenoh_cpp::rmw_node_data_t * node_data = - static_cast(node->data); - if (node_data == nullptr) { - RMW_SET_ERROR_MSG( - "Unable to create service as node data is invalid."); - return nullptr; - } RMW_CHECK_FOR_NULL_WITH_MSG( node->context, "expected initialized context", @@ -2862,10 +2809,14 @@ rmw_create_service( }); z_session_t session = context_impl->session(); - + auto node_data = context_impl->get_node_data(node); + RMW_CHECK_FOR_NULL_WITH_MSG( + node_data, + "NodeData not found.", + return nullptr); service_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( z_info_zid(session), - std::to_string(node_data->id), + std::to_string(node_data->id()), std::to_string( context_impl->get_next_entity_id()), rmw_zenoh_cpp::liveliness::EntityType::Service,