From 8f897b5363ee61960c98cffcbbf3e4df483238b7 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Wed, 7 Aug 2024 04:36:56 +0800 Subject: [PATCH] Store rmw_node_data_t in rmw_context_impl_s Signed-off-by: Yadunund --- rmw_zenoh_cpp/CMakeLists.txt | 1 + .../src/detail/rmw_context_impl_s.cpp | 74 +++++--- .../src/detail/rmw_context_impl_s.hpp | 17 +- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 14 -- rmw_zenoh_cpp/src/detail/rmw_node_data_t.cpp | 104 ++++++++++++ rmw_zenoh_cpp/src/detail/rmw_node_data_t.hpp | 67 ++++++++ rmw_zenoh_cpp/src/rmw_zenoh.cpp | 160 ++++++++---------- 7 files changed, 311 insertions(+), 126 deletions(-) create mode 100644 rmw_zenoh_cpp/src/detail/rmw_node_data_t.cpp create mode 100644 rmw_zenoh_cpp/src/detail/rmw_node_data_t.hpp diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index 78873587..b4a42713 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -40,6 +40,7 @@ add_library(rmw_zenoh_cpp SHARED src/detail/qos.cpp src/detail/rmw_context_impl_s.cpp src/detail/rmw_data_types.cpp + src/detail/rmw_node_data_t.cpp src/detail/service_type_support.cpp src/detail/type_support.cpp src/detail/type_support_common.cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index e1559ab3..8ea0c705 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -44,7 +44,7 @@ void rmw_context_impl_s::graph_sub_data_handler(const z_sample_t * sample, void } // Update the graph cache. - std::lock_guard lock(data_ptr->mutex_); + std::lock_guard lock(data_ptr->mutex_); if (data_ptr->is_shutdown_) { return; } @@ -169,7 +169,7 @@ rmw_ret_t rmw_context_impl_s::Data::subscribe() ///============================================================================= rmw_ret_t rmw_context_impl_s::Data::shutdown() { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); if (is_shutdown_) { return RMW_RET_OK; } @@ -232,35 +232,35 @@ rmw_context_impl_s::rmw_context_impl_s( ///============================================================================= std::string rmw_context_impl_s::enclave() const { - std::lock_guard lock(data_->mutex_); + std::lock_guard lock(data_->mutex_); return data_->enclave_; } ///============================================================================= z_session_t rmw_context_impl_s::session() const { - std::lock_guard lock(data_->mutex_); + std::lock_guard lock(data_->mutex_); return z_loan(data_->session_); } ///============================================================================= std::optional & rmw_context_impl_s::shm_manager() { - std::lock_guard lock(data_->mutex_); + std::lock_guard lock(data_->mutex_); return data_->shm_manager_; } ///============================================================================= rmw_guard_condition_t * rmw_context_impl_s::graph_guard_condition() { - std::lock_guard lock(data_->mutex_); + std::lock_guard lock(data_->mutex_); return data_->graph_guard_condition_; } ///============================================================================= size_t rmw_context_impl_s::get_next_entity_id() { - std::lock_guard lock(data_->mutex_); + std::lock_guard lock(data_->mutex_); return data_->next_entity_id_++; } @@ -273,14 +273,14 @@ rmw_ret_t rmw_context_impl_s::shutdown() ///============================================================================= bool rmw_context_impl_s::is_shutdown() const { - std::lock_guard lock(data_->mutex_); + std::lock_guard lock(data_->mutex_); return data_->is_shutdown_; } ///============================================================================= bool rmw_context_impl_s::session_is_valid() const { - std::lock_guard lock(data_->mutex_); + std::lock_guard lock(data_->mutex_); return z_check(data_->session_); } @@ -291,7 +291,7 @@ rmw_ret_t rmw_context_impl_s::get_node_names( rcutils_string_array_t * enclaves, rcutils_allocator_t * allocator) const { - std::lock_guard lock(data_->mutex_); + std::lock_guard lock(data_->mutex_); return data_->graph_cache_->get_node_names( node_names, node_namespaces, @@ -305,7 +305,7 @@ rmw_ret_t rmw_context_impl_s::get_topic_names_and_types( bool no_demangle, rmw_names_and_types_t * topic_names_and_types) const { - std::lock_guard lock(data_->mutex_); + std::lock_guard lock(data_->mutex_); return data_->graph_cache_->get_topic_names_and_types( allocator, no_demangle, @@ -317,7 +317,7 @@ rmw_ret_t rmw_context_impl_s::publisher_count_matched_subscriptions( const rmw_publisher_t * publisher, size_t * subscription_count) { - std::lock_guard lock(data_->mutex_); + std::lock_guard lock(data_->mutex_); return data_->graph_cache_->publisher_count_matched_subscriptions( publisher, subscription_count); @@ -328,7 +328,7 @@ rmw_ret_t rmw_context_impl_s::subscription_count_matched_publishers( const rmw_subscription_t * subscription, size_t * publisher_count) { - std::lock_guard lock(data_->mutex_); + std::lock_guard lock(data_->mutex_); return data_->graph_cache_->subscription_count_matched_publishers( subscription, publisher_count); @@ -339,7 +339,7 @@ rmw_ret_t rmw_context_impl_s::get_service_names_and_types( rcutils_allocator_t * allocator, rmw_names_and_types_t * service_names_and_types) const { - std::lock_guard lock(data_->mutex_); + std::lock_guard lock(data_->mutex_); return data_->graph_cache_->get_service_names_and_types( allocator, service_names_and_types); @@ -350,7 +350,7 @@ rmw_ret_t rmw_context_impl_s::count_publishers( const char * topic_name, size_t * count) const { - std::lock_guard lock(data_->mutex_); + std::lock_guard lock(data_->mutex_); return data_->graph_cache_->count_publishers( topic_name, count); @@ -361,7 +361,7 @@ rmw_ret_t rmw_context_impl_s::count_subscriptions( const char * topic_name, size_t * count) const { - std::lock_guard lock(data_->mutex_); + std::lock_guard lock(data_->mutex_); return data_->graph_cache_->count_subscriptions( topic_name, count); @@ -372,7 +372,7 @@ rmw_ret_t rmw_context_impl_s::count_services( const char * service_name, size_t * count) const { - std::lock_guard lock(data_->mutex_); + std::lock_guard lock(data_->mutex_); return data_->graph_cache_->count_services( service_name, count); @@ -383,7 +383,7 @@ rmw_ret_t rmw_context_impl_s::count_clients( const char * service_name, size_t * count) const { - std::lock_guard lock(data_->mutex_); + std::lock_guard lock(data_->mutex_); return data_->graph_cache_->count_clients( service_name, count); @@ -398,7 +398,7 @@ rmw_ret_t rmw_context_impl_s::get_entity_names_and_types_by_node( bool no_demangle, rmw_names_and_types_t * names_and_types) const { - std::lock_guard lock(data_->mutex_); + std::lock_guard lock(data_->mutex_); return data_->graph_cache_->get_entity_names_and_types_by_node( entity_type, allocator, @@ -416,7 +416,7 @@ rmw_ret_t rmw_context_impl_s::get_entities_info_by_topic( bool no_demangle, rmw_topic_endpoint_info_array_t * endpoints_info) const { - std::lock_guard lock(data_->mutex_); + std::lock_guard lock(data_->mutex_); return data_->graph_cache_->get_entities_info_by_topic( entity_type, allocator, @@ -431,7 +431,7 @@ rmw_ret_t rmw_context_impl_s::service_server_is_available( const char * service_type, bool * is_available) const { - std::lock_guard lock(data_->mutex_); + std::lock_guard lock(data_->mutex_); return data_->graph_cache_->service_server_is_available( service_name, service_type, @@ -444,9 +444,39 @@ void rmw_context_impl_s::set_qos_event_callback( const rmw_zenoh_cpp::rmw_zenoh_event_type_t & event_type, GraphCacheEventCallback callback) { - std::lock_guard lock(data_->mutex_); + std::lock_guard lock(data_->mutex_); return data_->graph_cache_->set_qos_event_callback( std::move(entity), event_type, std::move(callback)); } + +///============================================================================= +bool rmw_context_impl_s::add_node( + const rmw_node_t * const node, + std::unique_ptr node_data) +{ + std::lock_guard lock(data_->mutex_); + auto insertion = data_->nodes_.insert(std::make_pair(node, std::move(node_data))); + return insertion.second; +} + +///============================================================================= +void rmw_context_impl_s::remove_node( + const rmw_node_t * const node) +{ + std::lock_guard lock(data_->mutex_); + data_->nodes_.erase(node); +} + +///============================================================================= +std::optional rmw_context_impl_s::get_node_id( + const rmw_node_t * const node) const +{ + std::lock_guard lock(data_->mutex_); + auto it = data_->nodes_.find(node); + if (it == data_->nodes_.end()) { + return std::nullopt; + } + return it->second->id(); +} diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index 15f3a632..5e237e4f 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -21,9 +21,11 @@ #include #include #include +#include #include "graph_cache.hpp" #include "liveliness_utils.hpp" +#include "rmw_node_data_t.hpp" #include "rcutils/types.h" #include "rmw/rmw.h" @@ -33,6 +35,7 @@ class rmw_context_impl_s final { public: using GraphCacheEventCallback = rmw_zenoh_cpp::GraphCache::GraphCacheEventCallback; + using rmw_node_data_t = rmw_zenoh_cpp::rmw_node_data_t; // Constructor. // Once constructed, the context_impl instanced will manage the lifetime @@ -136,6 +139,16 @@ class rmw_context_impl_s final const rmw_zenoh_cpp::rmw_zenoh_event_type_t & event_type, GraphCacheEventCallback callback); + bool add_node( + const rmw_node_t * const node, + std::unique_ptr node_data); + + void remove_node( + const rmw_node_t * const node); + + std::optional get_node_id( + const rmw_node_t * const node) const; + private: // Bundle all class members into a data struct which can be passed as a // weak ptr to various threads for thread-safe access without capturing @@ -161,7 +174,7 @@ class rmw_context_impl_s final ~Data(); // Mutex to lock when accessing members. - mutable std::mutex mutex_; + mutable std::recursive_mutex mutex_; // RMW allocator. const rcutils_allocator_t * allocator_; // Enclave, name used to find security artifacts in a sros2 keystore. @@ -186,6 +199,8 @@ class rmw_context_impl_s final std::unique_ptr graph_cache_; // True once graph subscriber is initialized. bool is_initialized_; + // Nodes created from this context. + std::unordered_map> nodes_; }; std::shared_ptr data_{nullptr}; diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 5ed480e7..e5097988 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -44,20 +44,6 @@ namespace rmw_zenoh_cpp { -///============================================================================= -struct rmw_node_data_t -{ - // The Entity generated for the node. - std::shared_ptr entity; - - // Liveliness token for the node. - zc_owned_liveliness_token_t token; - - // The entity id of this node as generated by get_next_entity_id(). - // Every interface created by this node will include this id in its liveliness token. - size_t id; -}; - ///============================================================================= class rmw_publisher_data_t final { diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data_t.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data_t.cpp new file mode 100644 index 00000000..6cfd9e19 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data_t.cpp @@ -0,0 +1,104 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rmw_node_data_t.hpp" + +#include + +#include "logging_macros.hpp" + +#include "rcpputils/scope_exit.hpp" + +namespace rmw_zenoh_cpp +{ +///============================================================================= +std::unique_ptr rmw_node_data_t::make( + z_session_t session, + std::size_t domain_id, + std::size_t node_id, + const std::string & enclave, + const std::string & ns, + const std::string & node_name) +{ + // Create the entity. + auto entity = rmw_zenoh_cpp::liveliness::Entity::make( + z_info_zid(session), + std::to_string(node_id), + std::to_string(node_id), + rmw_zenoh_cpp::liveliness::EntityType::Node, + rmw_zenoh_cpp::liveliness::NodeInfo{ + domain_id, + ns, + node_name, + enclave + } + ); + if (entity == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create node entity."); + return nullptr; + } + + // Create the liveliness token. + zc_owned_liveliness_token_t token = zc_liveliness_declare_token( + session, + z_keyexpr(entity->keyexpr().c_str()), + NULL + ); + auto free_token = rcpputils::make_scope_exit( + [&token]() { + z_drop(z_move(token)); + }); + if (!z_check(token)) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create liveliness token for the node."); + return nullptr; + } + free_token.cancel(); + + return std::unique_ptr( + new rmw_node_data_t( + std::move(node_id), + std::move(entity), + std::move(token) + )); +} + +///============================================================================= +rmw_node_data_t::rmw_node_data_t( + std::size_t id, + std::shared_ptr entity, + zc_owned_liveliness_token_t token) +: id_(std::move(id)), + entity_(std::move(entity)), + token_(std::move(token)) +{ + // Do nothing. +} + +///============================================================================= +rmw_node_data_t::~rmw_node_data_t() +{ + zc_liveliness_undeclare_token(z_move(token_)); +} + +///============================================================================= +std::size_t rmw_node_data_t::id() const +{ + std::lock_guard lock(mutex_); + return id_; +} +} // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data_t.hpp b/rmw_zenoh_cpp/src/detail/rmw_node_data_t.hpp new file mode 100644 index 00000000..151f5e77 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data_t.hpp @@ -0,0 +1,67 @@ +// Copyright 2023 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__RMW_NODE_DATA_T_HPP_ +#define DETAIL__RMW_NODE_DATA_T_HPP_ + +#include + +#include +#include +#include + +#include "liveliness_utils.hpp" + +/// Structs for various type erased data fields. +namespace rmw_zenoh_cpp +{ +///============================================================================= +class rmw_node_data_t +{ +public: + // Create an instance of rmw_node_data_t. Returns nullptr if construction fails. + static std::unique_ptr make( + z_session_t session, + std::size_t domain_id, + std::size_t node_id, + const std::string & enclave, + const std::string & ns, + const std::string & node_name); + + // Get the id of this node. + std::size_t id() const; + + // Destructor. + ~rmw_node_data_t(); + +private: + // Constructor. + rmw_node_data_t( + std::size_t id, + std::shared_ptr entity, + zc_owned_liveliness_token_t token); + + // Internal mutex. + mutable std::mutex mutex_; + // The entity id of this node as generated by get_next_entity_id(). + // Every interface created by this node will include this id in its liveliness token. + std::size_t id_; + // The Entity generated for the node. + std::shared_ptr entity_; + // Liveliness token for the node. + zc_owned_liveliness_token_t token_; +}; +} // namespace rmw_zenoh_cpp + +#endif // DETAIL__RMW_NODE_DATA_T_HPP_ diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 4678ff08..f4173910 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -38,6 +38,7 @@ #include "detail/qos.hpp" #include "detail/rmw_context_impl_s.hpp" #include "detail/rmw_data_types.hpp" +#include "detail/rmw_node_data_t.hpp" #include "detail/serialization_format.hpp" #include "detail/type_support_common.hpp" @@ -211,7 +212,12 @@ rmw_create_node( context->impl, "expected initialized context", return nullptr); - if (context->impl->is_shutdown()) { + rmw_context_impl_t * context_impl = static_cast(context->impl); + RMW_CHECK_FOR_NULL_WITH_MSG( + context_impl, + "expected initialized context_impl", + return nullptr); + if (context_impl->is_shutdown()) { RCUTILS_SET_ERROR_MSG("context has been shutdown"); return nullptr; } @@ -271,66 +277,32 @@ rmw_create_node( allocator->deallocate(const_cast(node->namespace_), allocator->state); }); - // Put metadata into node->data. - auto node_data = static_cast( - allocator->allocate(sizeof(rmw_zenoh_cpp::rmw_node_data_t), allocator->state)); - RMW_CHECK_FOR_NULL_WITH_MSG( - node_data, - "failed to allocate memory for node data", - return nullptr); - auto free_node_data = rcpputils::make_scope_exit( - [node_data, allocator]() { - allocator->deallocate(node_data, allocator->state); - }); - - RMW_TRY_PLACEMENT_NEW( - node_data, node_data, return nullptr, - rmw_zenoh_cpp::rmw_node_data_t); - auto destruct_node_data = rcpputils::make_scope_exit( - [node_data]() { - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - node_data->~rmw_node_data_t(), rmw_zenoh_cpp::rmw_node_data_t); - }); - - // Initialize liveliness token for the node to advertise that a new node is in town. - node_data->id = context->impl->get_next_entity_id(); - z_session_t session = context->impl->session(); - node_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( - z_info_zid(session), - std::to_string(node_data->id), - std::to_string(node_data->id), - rmw_zenoh_cpp::liveliness::EntityType::Node, - rmw_zenoh_cpp::liveliness::NodeInfo{context->actual_domain_id, namespace_, name, - context->impl->enclave()}); - if (node_data->entity == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to generate keyexpr for liveliness token for the node."); + // Create an instance of rmw_node_data_t and store it in the context impl. + auto node_data = rmw_zenoh_cpp::rmw_node_data_t::make( + context_impl->session(), + context->actual_domain_id, + context_impl->get_next_entity_id(), + context_impl->enclave(), + namespace_, + name); + if (node_data == nullptr) { + // Error already handled. return nullptr; } - node_data->token = zc_liveliness_declare_token( - session, - z_keyexpr(node_data->entity->keyexpr().c_str()), - NULL - ); - auto free_token = rcpputils::make_scope_exit( - [node_data]() { - z_drop(z_move(node_data->token)); - }); - if (!z_check(node_data->token)) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to create liveliness token for the node."); + + // Add node_data to rmw_context_impl_s. + if (!context_impl->add_node( + node, + std::move(node_data))) + { return nullptr; } node->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; node->context = context; - node->data = node_data; + // The rmw_node_data_t instance is stored in rmw_context_impl_s. + node->data = nullptr; - free_token.cancel(); - free_node_data.cancel(); - destruct_node_data.cancel(); free_namespace.cancel(); free_name.cancel(); free_node.cancel(); @@ -345,7 +317,8 @@ rmw_destroy_node(rmw_node_t * node) RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(node->data, RMW_RET_INVALID_ARGUMENT); + rmw_context_impl_t * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, node->implementation_identifier, @@ -354,15 +327,8 @@ rmw_destroy_node(rmw_node_t * node) rcutils_allocator_t * allocator = &node->context->options.allocator; - // Undeclare liveliness token for the node to advertise that the node has ridden - // off into the sunset. - rmw_zenoh_cpp::rmw_node_data_t * node_data = - static_cast(node->data); - if (node_data != nullptr) { - zc_liveliness_undeclare_token(z_move(node_data->token)); - RMW_TRY_DESTRUCTOR(node_data->~rmw_node_data_t(), rmw_node_data_t, ); - allocator->deallocate(node_data, allocator->state); - } + // Remove the node from rmw_context_impl_t. + context_impl->remove_node(node); allocator->deallocate(const_cast(node->namespace_), allocator->state); allocator->deallocate(const_cast(node->name), allocator->state); @@ -474,9 +440,6 @@ rmw_create_publisher( "Strict requirement on unique network flow endpoints for publishers not supported"); return nullptr; } - const rmw_zenoh_cpp::rmw_node_data_t * node_data = - static_cast(node->data); - RMW_CHECK_ARGUMENT_FOR_NULL(node_data, nullptr); // Get the RMW type support. const rosidl_message_type_support_t * type_support = find_message_type_support(type_supports); @@ -499,11 +462,21 @@ rmw_create_publisher( context_impl, "unable to get rmw_context_impl_s", return nullptr); + if (context_impl->is_shutdown()) { + RMW_SET_ERROR_MSG("context is shutdown"); + return nullptr; + } if (!context_impl->session_is_valid()) { RMW_SET_ERROR_MSG("zenoh session is invalid"); return nullptr; } + std::optional maybe_node_id = context_impl->get_node_id(node); + if (!maybe_node_id.has_value()) { + RMW_SET_ERROR_MSG("node data not found in context."); + return nullptr; + } + rcutils_allocator_t * allocator = &node->context->options.allocator; // Create the publisher. @@ -670,7 +643,7 @@ rmw_create_publisher( publisher_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( z_info_zid(session), - std::to_string(node_data->id), + std::to_string(maybe_node_id.value()), std::to_string( context_impl->get_next_entity_id()), rmw_zenoh_cpp::liveliness::EntityType::Publisher, @@ -1306,11 +1279,6 @@ rmw_create_subscription( return nullptr; } - RMW_CHECK_ARGUMENT_FOR_NULL(node->data, nullptr); - auto node_data = static_cast(node->data); - RMW_CHECK_FOR_NULL_WITH_MSG( - node_data, "unable to create subscription as node_data is invalid.", - return nullptr); // TODO(yadunund): Check if a duplicate entry for the same topic name + topic type // is present in node_data->subscriptions and if so return error; RMW_CHECK_FOR_NULL_WITH_MSG( @@ -1327,11 +1295,21 @@ rmw_create_subscription( context_impl, "unable to get rmw_context_impl_s", return nullptr); + if (context_impl->is_shutdown()) { + RMW_SET_ERROR_MSG("context is shutdown"); + return nullptr; + } if (!context_impl->session_is_valid()) { RMW_SET_ERROR_MSG("zenoh session is invalid"); return nullptr; } + std::optional maybe_node_id = context_impl->get_node_id(node); + if (!maybe_node_id.has_value()) { + RMW_SET_ERROR_MSG("node data not found in context."); + return nullptr; + } + rcutils_allocator_t * allocator = &node->context->options.allocator; // Create the rmw_subscription. @@ -1519,7 +1497,7 @@ rmw_create_subscription( // Publish to the graph that a new subscription is in town sub_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( z_info_zid(session), - std::to_string(node_data->id), + std::to_string(maybe_node_id.value()), std::to_string( context_impl->get_next_entity_id()), rmw_zenoh_cpp::liveliness::EntityType::Subscription, @@ -2074,16 +2052,18 @@ rmw_create_client( context_impl, "unable to get rmw_context_impl_s", return nullptr); + if (context_impl->is_shutdown()) { + RMW_SET_ERROR_MSG("context is shutdown"); + return nullptr; + } if (!context_impl->session_is_valid()) { RMW_SET_ERROR_MSG("zenoh session is invalid"); return nullptr; } - RMW_CHECK_ARGUMENT_FOR_NULL(node->data, nullptr); - const rmw_zenoh_cpp::rmw_node_data_t * node_data = - static_cast(node->data); - if (node_data == nullptr) { - RMW_SET_ERROR_MSG( - "Unable to create client as node data is invalid."); + + std::optional maybe_node_id = context_impl->get_node_id(node); + if (!maybe_node_id.has_value()) { + RMW_SET_ERROR_MSG("node data not found in context."); return nullptr; } @@ -2276,7 +2256,7 @@ rmw_create_client( z_session_t session = context_impl->session(); client_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( z_info_zid(session), - std::to_string(node_data->id), + std::to_string(maybe_node_id.value()), std::to_string( context_impl->get_next_entity_id()), rmw_zenoh_cpp::liveliness::EntityType::Client, @@ -2653,14 +2633,6 @@ rmw_create_service( return nullptr; } } - RMW_CHECK_ARGUMENT_FOR_NULL(node->data, nullptr); - const rmw_zenoh_cpp::rmw_node_data_t * node_data = - static_cast(node->data); - if (node_data == nullptr) { - RMW_SET_ERROR_MSG( - "Unable to create service as node data is invalid."); - return nullptr; - } RMW_CHECK_FOR_NULL_WITH_MSG( node->context, "expected initialized context", @@ -2675,11 +2647,21 @@ rmw_create_service( context_impl, "unable to get rmw_context_impl_s", return nullptr); + if (context_impl->is_shutdown()) { + RMW_SET_ERROR_MSG("context is shutdown"); + return nullptr; + } if (!context_impl->session_is_valid()) { RMW_SET_ERROR_MSG("zenoh session is invalid"); return nullptr; } + std::optional maybe_node_id = context_impl->get_node_id(node); + if (!maybe_node_id.has_value()) { + RMW_SET_ERROR_MSG("node data not found in context."); + return nullptr; + } + // SERVICE DATA ============================================================== rcutils_allocator_t * allocator = &node->context->options.allocator; @@ -2874,7 +2856,7 @@ rmw_create_service( service_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( z_info_zid(session), - std::to_string(node_data->id), + std::to_string(maybe_node_id.value()), std::to_string( context_impl->get_next_entity_id()), rmw_zenoh_cpp::liveliness::EntityType::Service,