From d19c278ef7d74857f44b550e5f1b4b2d7d3623c8 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Sat, 28 Sep 2024 07:13:34 +0800 Subject: [PATCH] Create PublisherData within NodeData Signed-off-by: Yadunund --- rmw_zenoh_cpp/CMakeLists.txt | 1 + rmw_zenoh_cpp/src/detail/graph_cache.cpp | 13 +- rmw_zenoh_cpp/src/detail/graph_cache.hpp | 3 +- rmw_zenoh_cpp/src/detail/liveliness_utils.cpp | 21 + rmw_zenoh_cpp/src/detail/liveliness_utils.hpp | 10 +- .../src/detail/rmw_context_impl_s.cpp | 13 +- rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 7 - rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 39 -- rmw_zenoh_cpp/src/detail/rmw_node_data.cpp | 111 ++++- rmw_zenoh_cpp/src/detail/rmw_node_data.hpp | 33 ++ .../src/detail/rmw_publisher_data.cpp | 456 +++++++++++++++++ .../src/detail/rmw_publisher_data.hpp | 116 +++++ rmw_zenoh_cpp/src/rmw_event.cpp | 27 +- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 464 ++++-------------- 14 files changed, 881 insertions(+), 433 deletions(-) create mode 100644 rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp create mode 100644 rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index 95aab3d6..631bf6cb 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -40,6 +40,7 @@ add_library(rmw_zenoh_cpp SHARED src/detail/qos.cpp src/detail/rmw_context_impl_s.cpp src/detail/rmw_data_types.cpp + src/detail/rmw_publisher_data.cpp src/detail/rmw_node_data.cpp src/detail/service_type_support.cpp src/detail/type_support.cpp diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index bdc87a7b..74b50658 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -852,25 +852,24 @@ rmw_ret_t GraphCache::get_topic_names_and_types( ///============================================================================= rmw_ret_t GraphCache::publisher_count_matched_subscriptions( - const rmw_publisher_t * publisher, + PublisherDataConstPtr pub_data, size_t * subscription_count) { // TODO(Yadunund): Replace this logic by returning a number that is tracked once // we support matched qos events. *subscription_count = 0; - GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(publisher->topic_name); + auto topic_info = pub_data->entity()->topic_info().value(); + GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(topic_info.name_); if (topic_it != graph_topics_.end()) { - rmw_publisher_data_t * pub_data = - static_cast(publisher->data); GraphNode::TopicTypeMap::const_iterator topic_data_it = topic_it->second.find( - pub_data->type_support->get_name()); + topic_info.type_); if (topic_data_it != topic_it->second.end()) { for (const auto & [_, topic_data] : topic_data_it->second) { // If a subscription exists with compatible QoS, update the subscription count. if (!topic_data->subs_.empty()) { rmw_qos_compatibility_type_t is_compatible; rmw_ret_t ret = rmw_qos_profile_check_compatible( - pub_data->adapted_qos_profile, + pub_data->adapted_qos_profile(), topic_data->info_.qos_, &is_compatible, nullptr, @@ -1348,7 +1347,7 @@ void GraphCache::set_querying_subscriber_callback( const rmw_subscription_data_t * sub_data, QueryingSubscriberCallback cb) { - const std::string keyexpr = sub_data->entity->topic_info()->topic_keyexpr_; + const std::string keyexpr = sub_data->entity->topic_info().value().topic_keyexpr_; auto cb_it = querying_subs_cbs_.find(keyexpr); if (cb_it == querying_subs_cbs_.end()) { querying_subs_cbs_[keyexpr] = std::move( diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index a8cceecd..420e91ed 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -27,6 +27,7 @@ #include "event.hpp" #include "liveliness_utils.hpp" #include "ordered_map.hpp" +#include "rmw_publisher_data.hpp" #include "rcutils/allocator.h" #include "rcutils/types.h" @@ -133,7 +134,7 @@ class GraphCache final rmw_names_and_types_t * topic_names_and_types) const; rmw_ret_t publisher_count_matched_subscriptions( - const rmw_publisher_t * publisher, + PublisherDataConstPtr pub_data, size_t * subscription_count); rmw_ret_t subscription_count_matched_publishers( diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index 5b86d073..38b5855f 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -15,7 +15,9 @@ #include "liveliness_utils.hpp" #include +#include #include +#include #include #include #include @@ -609,6 +611,12 @@ std::string Entity::node_enclave() const return this->node_info_.enclave_; } +///============================================================================= +NodeInfo Entity::node_info() const +{ + return this->node_info_; +} + ///============================================================================= std::optional Entity::topic_info() const { @@ -657,4 +665,17 @@ std::string demangle_name(const std::string & input) return output; } } // namespace liveliness + +void +generate_random_gid(uint8_t gid[RMW_GID_STORAGE_SIZE]) +{ + std::random_device dev; + std::mt19937 rng(dev()); + std::uniform_int_distribution dist( + std::numeric_limits::min(), std::numeric_limits::max()); + + for (size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i) { + gid[i] = dist(rng); + } +} } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp index 6fa09833..18cc45a4 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp @@ -160,7 +160,10 @@ class Entity std::string node_enclave() const; - /// Get the topic_info. + /// Get the NodeInfo. + NodeInfo node_info() const; + + /// Get the TopicInfo if present. std::optional topic_info() const; /// Get the liveliness keyexpr for this entity. @@ -230,8 +233,13 @@ std::optional keyexpr_to_qos(const std::string & keyexpr); /// Convert a Zenoh id to a string. std::string zid_to_str(const z_id_t & id); } // namespace liveliness + +///============================================================================= +// Helper function to generate a randon GID. +void generate_random_gid(uint8_t gid[RMW_GID_STORAGE_SIZE]); } // namespace rmw_zenoh_cpp + ///============================================================================= // Allow Entity to be hashed and used as a key in unordered_maps/sets namespace std diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index 875f7c09..3a6148ee 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -149,8 +149,17 @@ rmw_ret_t rmw_context_impl_s::Data::subscribe_to_ros_graph() rmw_ret_t rmw_context_impl_s::Data::shutdown() { std::lock_guard lock(mutex_); + rmw_ret_t ret = RMW_RET_OK; if (is_shutdown_) { - return RMW_RET_OK; + return ret; + } + + // Shutdown all the nodes in this context. + for (auto node_it = nodes_.begin(); node_it != nodes_.end(); ++node_it) { + ret = node_it->second->shutdown(); + if (ret != RMW_RET_OK) { + return ret; + } } z_undeclare_subscriber(z_move(graph_subscriber_)); @@ -170,6 +179,7 @@ rmw_ret_t rmw_context_impl_s::Data::shutdown() rmw_context_impl_s::Data::~Data() { auto ret = this->shutdown(); + nodes_.clear(); static_cast(ret); } @@ -407,6 +417,7 @@ bool rmw_context_impl_s::create_node_data( } auto node_data = rmw_zenoh_cpp::NodeData::make( + node, this->get_next_entity_id(), z_loan(data_->session_), data_->domain_id_, diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 5beb6edd..4b56a5e0 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -72,13 +72,6 @@ saved_msg_data::~saved_msg_data() z_drop(z_move(payload)); } -///============================================================================= -size_t rmw_publisher_data_t::get_next_sequence_number() -{ - std::lock_guard lock(sequence_number_mutex_); - return sequence_number_++; -} - ///============================================================================= bool rmw_subscription_data_t::queue_has_data_and_attach_condition_if_not( rmw_wait_set_data_t * wait_set_data) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index e5097988..90c4279c 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -44,45 +44,6 @@ namespace rmw_zenoh_cpp { -///============================================================================= -class rmw_publisher_data_t final -{ -public: - // The Entity generated for the publisher. - std::shared_ptr entity; - - // An owned publisher. - z_owned_publisher_t pub; - - // Optional publication cache when durability is transient_local. - std::optional pub_cache; - - // Store the actual QoS profile used to configure this publisher. - rmw_qos_profile_t adapted_qos_profile; - - // Liveliness token for the publisher. - zc_owned_liveliness_token_t token; - - // Type support fields - const void * type_support_impl; - const char * typesupport_identifier; - const rosidl_type_hash_t * type_hash; - MessageTypeSupport * type_support; - - // Context for memory allocation for messages. - rmw_context_t * context; - - uint8_t pub_gid[RMW_GID_STORAGE_SIZE]; - - size_t get_next_sequence_number(); - - EventsManager events_mgr; - -private: - std::mutex sequence_number_mutex_; - size_t sequence_number_{1}; -}; - ///============================================================================= // z_owned_closure_sample_t void sub_data_handler(const z_sample_t * sample, void * sub_data); diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index cbfd0314..04d05483 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -26,6 +26,7 @@ namespace rmw_zenoh_cpp { ///============================================================================= std::shared_ptr NodeData::make( + const rmw_node_t * const node, std::size_t id, z_session_t session, std::size_t domain_id, @@ -73,6 +74,7 @@ std::shared_ptr NodeData::make( return std::shared_ptr( new NodeData{ + node, id, std::move(entity), std::move(token) @@ -81,12 +83,16 @@ std::shared_ptr NodeData::make( ///============================================================================= NodeData::NodeData( + const rmw_node_t * const node, std::size_t id, std::shared_ptr entity, zc_owned_liveliness_token_t token) -: id_(std::move(id)), +: node_(node), + id_(std::move(id)), entity_(std::move(entity)), - token_(std::move(token)) + token_(std::move(token)), + is_shutdown_(false), + pubs_({}) { // Do nothing. } @@ -94,7 +100,14 @@ NodeData::NodeData( ///============================================================================= NodeData::~NodeData() { - zc_liveliness_undeclare_token(z_move(token_)); + const rmw_ret_t ret = this->shutdown(); + if (ret != RMW_RET_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Error destructing node /%s.", + entity_->node_name().c_str() + ); + } } ///============================================================================= @@ -103,4 +116,96 @@ std::size_t NodeData::id() const std::lock_guard lock(mutex_); return id_; } + +///============================================================================= +bool NodeData::create_pub_data( + const rmw_publisher_t * const publisher, + z_session_t session, + std::size_t id, + const std::string & topic_name, + const rosidl_message_type_support_t * type_support, + const rmw_qos_profile_t * qos_profile) +{ + std::lock_guard lock_guard(mutex_); + if (pubs_.count(publisher) > 0) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "PublisherData already exists."); + return false; + } + + auto pub_data = PublisherData::make( + std::move(session), + node_, + entity_->node_info(), + id_, + std::move(id), + std::move(topic_name), + type_support, + qos_profile); + if (pub_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to make PublisherData."); + return false; + } + + auto insertion = pubs_.insert(std::make_pair(publisher, std::move(pub_data))); + if (!insertion.second) { + return false; + } + return true; +} + +///============================================================================= +PublisherDataPtr NodeData::get_pub_data(const rmw_publisher_t * const publisher) +{ + std::lock_guard lock_guard(mutex_); + auto it = pubs_.find(publisher); + if (it == pubs_.end()) { + return nullptr; + } + + return it->second; +} + +///============================================================================= +void NodeData::delete_pub_data(const rmw_publisher_t * const publisher) +{ + std::lock_guard lock_guard(mutex_); + pubs_.erase(publisher); +} + +///============================================================================= +rmw_ret_t NodeData::shutdown() +{ + std::lock_guard lock(mutex_); + rmw_ret_t ret = RMW_RET_OK; + if (is_shutdown_) { + return ret; + } + + // Shutdown all the entities within this node. + for (auto pub_it = pubs_.begin(); pub_it != pubs_.end(); ++pub_it) { + ret = pub_it->second->shutdown(); + if (ret != RMW_RET_OK) { + return ret; + } + } + + // Unregister this node from the ROS graph. + zc_liveliness_undeclare_token(z_move(token_)); + + is_shutdown_ = true; + return RMW_RET_OK; +} + +///============================================================================= +// Check if the Node is shutdown. +bool NodeData::is_shutdown() const +{ + std::lock_guard lock(mutex_); + return is_shutdown_; +} + } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp index 737c19b0..1f1d64cd 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp @@ -20,8 +20,12 @@ #include #include #include +#include #include "liveliness_utils.hpp" +#include "rmw_publisher_data.hpp" + +#include "rmw/rmw.h" namespace rmw_zenoh_cpp { @@ -32,6 +36,7 @@ class NodeData final public: // Make a shared_ptr of NodeData. Returns nullptr if construction fails. static std::shared_ptr make( + const rmw_node_t * const node, std::size_t id, z_session_t session, std::size_t domain_id, @@ -42,17 +47,41 @@ class NodeData final // Get the id of this node. std::size_t id() const; + // Create a new PublisherData for a publisher in this node. + bool create_pub_data( + const rmw_publisher_t * const publisher, + z_session_t session, + std::size_t id, + const std::string & topic_name, + const rosidl_message_type_support_t * type_support, + const rmw_qos_profile_t * qos_profile); + + /// Retrieve the PublisherData for a given rmw_publisher_t if present. + PublisherDataPtr get_pub_data(const rmw_publisher_t * const publisher); + + // Delete the PublisherData for a given rmw_publisher_t if present. + void delete_pub_data(const rmw_publisher_t * const publisher); + + // Shutdown this NodeData. + rmw_ret_t shutdown(); + + // Check if this NodeData is shutdown. + bool is_shutdown() const; + // Destructor. ~NodeData(); private: // Constructor. NodeData( + const rmw_node_t * const node, std::size_t id, std::shared_ptr entity, zc_owned_liveliness_token_t token); // Internal mutex. mutable std::mutex mutex_; + // The rmw_node_t associated with this NodeData. + const rmw_node_t * node_; // The entity id of this node as generated by get_next_entity_id(). // Every interface created by this node will include this id in its liveliness token. std::size_t id_; @@ -60,6 +89,10 @@ class NodeData final std::shared_ptr entity_; // Liveliness token for the node. zc_owned_liveliness_token_t token_; + // Shutdown flag. + bool is_shutdown_; + // Map of publishers. + std::unordered_map pubs_; }; } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp new file mode 100644 index 00000000..10505fef --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -0,0 +1,456 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rmw_publisher_data.hpp" + +#include + +#include +#include +#include +#include +#include + +#include "cdr.hpp" +#include "rmw_context_impl_s.hpp" +#include "message_type_support.hpp" +#include "logging_macros.hpp" +#include "qos.hpp" + +#include "rcpputils/scope_exit.hpp" + +#include "rmw/error_handling.h" +#include "rmw/get_topic_endpoint_info.h" +#include "rmw/impl/cpp/macros.hpp" + +namespace rmw_zenoh_cpp +{ +namespace +{ +z_owned_bytes_map_t +create_map_and_set_sequence_num(int64_t sequence_number, uint8_t gid[RMW_GID_STORAGE_SIZE]) +{ + z_owned_bytes_map_t map = z_bytes_map_new(); + if (!z_check(map)) { + RMW_SET_ERROR_MSG("failed to allocate map for sequence number"); + return z_bytes_map_null(); + } + auto free_attachment_map = rcpputils::make_scope_exit( + [&map]() { + z_bytes_map_drop(z_move(map)); + }); + + // The largest possible int64_t number is INT64_MAX, i.e. 9223372036854775807. + // That is 19 characters long, plus one for the trailing \0, means we need 20 bytes. + char seq_id_str[20]; + if (rcutils_snprintf(seq_id_str, sizeof(seq_id_str), "%" PRId64, sequence_number) < 0) { + RMW_SET_ERROR_MSG("failed to print sequence_number into buffer"); + return z_bytes_map_null(); + } + z_bytes_map_insert_by_copy(&map, z_bytes_new("sequence_number"), z_bytes_new(seq_id_str)); + + auto now = std::chrono::system_clock::now().time_since_epoch(); + auto now_ns = std::chrono::duration_cast(now); + char source_ts_str[20]; + if (rcutils_snprintf(source_ts_str, sizeof(source_ts_str), "%" PRId64, now_ns.count()) < 0) { + RMW_SET_ERROR_MSG("failed to print sequence_number into buffer"); + return z_bytes_map_null(); + } + z_bytes_map_insert_by_copy(&map, z_bytes_new("source_timestamp"), z_bytes_new(source_ts_str)); + + z_bytes_t gid_bytes; + gid_bytes.len = RMW_GID_STORAGE_SIZE; + gid_bytes.start = gid; + + z_bytes_map_insert_by_copy(&map, z_bytes_new("source_gid"), gid_bytes); + + free_attachment_map.cancel(); + + return map; +} +} // namespace +///============================================================================= +std::shared_ptr PublisherData::make( + z_session_t session, + const rmw_node_t * const node, + liveliness::NodeInfo node_info, + std::size_t node_id, + std::size_t publisher_id, + const std::string & topic_name, + const rosidl_message_type_support_t * type_support, + const rmw_qos_profile_t * qos_profile) +{ + auto pub_data = std::shared_ptr(new PublisherData{}); + pub_data->rmw_node_ = node; + generate_random_gid(pub_data->gid_); + rmw_qos_profile_t adapted_qos_profile = *qos_profile; + rmw_ret_t ret = QoS::get().best_available_qos( + node, topic_name.c_str(), &adapted_qos_profile, rmw_get_subscriptions_info_by_topic); + if (RMW_RET_OK != ret) { + return nullptr; + } + + rcutils_allocator_t * allocator = &node->context->options.allocator; + + const rosidl_type_hash_t * type_hash = type_support->get_type_hash_func(type_support); + pub_data->type_support_impl_ = type_support->data; + auto callbacks = static_cast(type_support->data); + pub_data->type_support_ = std::make_unique(callbacks); + + // Convert the type hash to a string so that it can be included in + // the keyexpr. + char * type_hash_c_str = nullptr; + rcutils_ret_t stringify_ret = rosidl_stringify_type_hash( + type_hash, + *allocator, + &type_hash_c_str); + if (RCUTILS_RET_BAD_ALLOC == stringify_ret) { + RMW_SET_ERROR_MSG("Failed to allocate type_hash_c_str."); + return nullptr; + } + auto always_free_type_hash_c_str = rcpputils::make_scope_exit( + [&allocator, &type_hash_c_str]() { + allocator->deallocate(type_hash_c_str, allocator->state); + }); + + std::size_t domain_id = node_info.domain_id_; + pub_data->entity_ = liveliness::Entity::make( + z_info_zid(session), + std::to_string(node_id), + std::to_string(publisher_id), + liveliness::EntityType::Publisher, + std::move(node_info), + liveliness::TopicInfo{ + std::move(domain_id), + topic_name, + pub_data->type_support_->get_name(), + type_hash_c_str, + adapted_qos_profile} + ); + if (pub_data->entity_ == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to generate keyexpr for liveliness token for the publisher %s.", + topic_name.c_str()); + return nullptr; + } + z_owned_keyexpr_t keyexpr = z_keyexpr_new( + pub_data->entity_->topic_info()->topic_keyexpr_.c_str()); + auto always_free_ros_keyexpr = rcpputils::make_scope_exit( + [&keyexpr]() { + z_keyexpr_drop(z_move(keyexpr)); + }); + if (!z_keyexpr_check(&keyexpr)) { + RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); + return nullptr; + } + + // Create a Publication Cache if durability is transient_local. + if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { + ze_publication_cache_options_t pub_cache_opts = ze_publication_cache_options_default(); + pub_cache_opts.history = adapted_qos_profile.depth; + pub_cache_opts.queryable_complete = true; + // Set the queryable_prefix to the session id so that querying subscribers can specify this + // session id to obtain latest data from this specific publication caches when querying over + // the same keyexpression. + // When such a prefix is added to the PublicationCache, it listens to queries with this extra + // prefix (allowing to be queried in a unique way), but still replies with the original + // publications' key expressions. + z_owned_keyexpr_t queryable_prefix = z_keyexpr_new(pub_data->entity_->zid().c_str()); + auto always_free_queryable_prefix = rcpputils::make_scope_exit( + [&queryable_prefix]() { + z_keyexpr_drop(z_move(queryable_prefix)); + }); + pub_cache_opts.queryable_prefix = z_loan(queryable_prefix); + pub_data->pub_cache_ = ze_declare_publication_cache( + session, + z_loan(keyexpr), + &pub_cache_opts + ); + if (!pub_data->pub_cache_.has_value() || !z_check(pub_data->pub_cache_.value())) { + RMW_SET_ERROR_MSG("unable to create zenoh publisher cache"); + return nullptr; + } + } + auto undeclare_z_publisher_cache = rcpputils::make_scope_exit( + [pub_data]() { + if (pub_data && pub_data->pub_cache_.has_value()) { + z_drop(z_move(pub_data->pub_cache_.value())); + } + }); + + // Set congestion_control to BLOCK if appropriate. + z_publisher_options_t opts = z_publisher_options_default(); + opts.congestion_control = Z_CONGESTION_CONTROL_DROP; + if (adapted_qos_profile.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL && + adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) + { + opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK; + } + // TODO(clalancette): What happens if the key name is a valid but empty string? + pub_data->pub_ = z_declare_publisher( + session, + z_loan(keyexpr), + &opts + ); + if (!z_check(pub_data->pub_)) { + RMW_SET_ERROR_MSG("Unable to create Zenoh publisher."); + return nullptr; + } + auto undeclare_z_publisher = rcpputils::make_scope_exit( + [pub_data]() { + z_undeclare_publisher(z_move(pub_data->pub_)); + }); + + pub_data->token_ = zc_liveliness_declare_token( + session, + z_keyexpr(pub_data->entity_->liveliness_keyexpr().c_str()), + NULL + ); + auto free_token = rcpputils::make_scope_exit( + [pub_data]() { + if (pub_data != nullptr) { + z_drop(z_move(pub_data->token_)); + } + }); + if (!z_check(pub_data->token_)) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create liveliness token for the publisher."); + return nullptr; + } + + // Initialize the events manager. + pub_data->events_mgr_ = std::make_shared(); + + free_token.cancel(); + undeclare_z_publisher_cache.cancel(); + undeclare_z_publisher.cancel(); + + return pub_data; +} + +///============================================================================= +PublisherData::PublisherData() +: sequence_number_(1), + is_shutdown_(false) +{ + // Do nothing. +} + +///============================================================================= +rmw_qos_profile_t PublisherData::adapted_qos_profile() const +{ + std::lock_guard lock(mutex_); + return entity_->topic_info()->qos_; +} + + +///============================================================================= +rmw_ret_t PublisherData::publish( + const void * ros_message, + std::optional & shm_manager) +{ + std::lock_guard lock(mutex_); + if (is_shutdown_) { + RMW_SET_ERROR_MSG("Unable to publish as the publisher has been shutdown."); + return RMW_RET_ERROR; + } + + // Serialize data. + size_t max_data_length = type_support_->get_estimated_serialized_size( + ros_message, + type_support_impl_); + + // To store serialized message byte array. + char * msg_bytes = nullptr; + std::optional shmbuf = std::nullopt; + auto always_free_shmbuf = rcpputils::make_scope_exit( + [&shmbuf]() { + if (shmbuf.has_value()) { + zc_shmbuf_drop(&shmbuf.value()); + } + }); + + rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator; + + auto always_free_msg_bytes = rcpputils::make_scope_exit( + [&msg_bytes, allocator, &shmbuf]() { + if (msg_bytes && !shmbuf.has_value()) { + allocator->deallocate(msg_bytes, allocator->state); + } + }); + + // Get memory from SHM buffer if available. + if (shm_manager.has_value() && + zc_shm_manager_check(&shm_manager.value())) + { + shmbuf = zc_shm_alloc( + &shm_manager.value(), + max_data_length); + if (!z_check(shmbuf.value())) { + zc_shm_gc(&shm_manager.value()); + shmbuf = zc_shm_alloc(&shm_manager.value(), max_data_length); + if (!z_check(shmbuf.value())) { + // TODO(Yadunund): Should we revert to regular allocation and not return an error? + RMW_SET_ERROR_MSG("Failed to allocate a SHM buffer, even after GCing"); + return RMW_RET_ERROR; + } + } + msg_bytes = reinterpret_cast(zc_shmbuf_ptr(&shmbuf.value())); + } else { + // Get memory from the allocator. + msg_bytes = static_cast(allocator->allocate(max_data_length, allocator->state)); + RMW_CHECK_FOR_NULL_WITH_MSG( + msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC); + } + + // Object that manages the raw buffer + eprosima::fastcdr::FastBuffer fastbuffer(msg_bytes, max_data_length); + + // Object that serializes the data + rmw_zenoh_cpp::Cdr ser(fastbuffer); + if (!type_support_->serialize_ros_message( + ros_message, + ser.get_cdr(), + type_support_impl_)) + { + RMW_SET_ERROR_MSG("could not serialize ROS message"); + return RMW_RET_ERROR; + } + + const size_t data_length = ser.get_serialized_data_length(); + + const int64_t sequence_number = sequence_number_++; + + z_owned_bytes_map_t map = + create_map_and_set_sequence_num(sequence_number, gid_); + if (!z_check(map)) { + // create_map_and_set_sequence_num already set the error + return RMW_RET_ERROR; + } + auto always_free_attachment_map = rcpputils::make_scope_exit( + [&map]() { + z_bytes_map_drop(z_move(map)); + }); + + int ret; + // The encoding is simply forwarded and is useful when key expressions in the + // session use different encoding formats. In our case, all key expressions + // will be encoded with CDR so it does not really matter. + z_publisher_put_options_t options = z_publisher_put_options_default(); + options.attachment = z_bytes_map_as_attachment(&map); + + if (shmbuf.has_value()) { + zc_shmbuf_set_length(&shmbuf.value(), data_length); + zc_owned_payload_t payload = zc_shmbuf_into_payload(z_move(shmbuf.value())); + ret = zc_publisher_put_owned(z_loan(pub_), z_move(payload), &options); + } else { + // Returns 0 if success. + ret = z_publisher_put( + z_loan(pub_), + reinterpret_cast(msg_bytes), + data_length, + &options); + } + if (ret) { + RMW_SET_ERROR_MSG("unable to publish message"); + return RMW_RET_ERROR; + } + + return RMW_RET_OK; +} + +///============================================================================= +std::shared_ptr PublisherData::entity() const +{ + std::lock_guard lock(mutex_); + return entity_; +} + +///============================================================================= +size_t PublisherData::get_next_sequence_number() +{ + std::lock_guard lock(mutex_); + return sequence_number_++; +} + +///============================================================================= +const uint8_t * PublisherData::gid() const +{ + std::lock_guard lock(mutex_); + return gid_; +} + +///============================================================================= +z_publisher_t PublisherData::publisher() const +{ + std::lock_guard lock(mutex_); + return z_loan(pub_); +} + +///============================================================================= +bool PublisherData::liveliness_is_valid() const +{ + std::lock_guard lock(mutex_); + return zc_liveliness_token_check(&token_); +} + +///============================================================================= +std::shared_ptr PublisherData::events_mgr() const +{ + std::lock_guard lock(mutex_); + return events_mgr_; +} + +///============================================================================= +PublisherData::~PublisherData() +{ + const rmw_ret_t ret = this->shutdown(); + if (ret != RMW_RET_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Error destructing publisher /%s.", + entity_->topic_info().value().name_.c_str() + ); + } +} + +///============================================================================= +rmw_ret_t PublisherData::shutdown() +{ + std::lock_guard lock(mutex_); + if (is_shutdown_) { + return RMW_RET_OK; + } + + // Unregister this node from the ROS graph. + zc_liveliness_undeclare_token(z_move(token_)); + if (pub_cache_.has_value()) { + z_drop(z_move(pub_cache_.value())); + } + z_undeclare_publisher(z_move(pub_)); + + is_shutdown_ = true; + return RMW_RET_OK; +} + +///============================================================================= +// Check if the Node is shutdown. +bool PublisherData::is_shutdown() const +{ + std::lock_guard lock(mutex_); + return is_shutdown_; +} +} // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp new file mode 100644 index 00000000..02f9ad64 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp @@ -0,0 +1,116 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__RMW_PUBLISHER_DATA_HPP_ +#define DETAIL__RMW_PUBLISHER_DATA_HPP_ + +#include + +#include +#include +#include +#include + +#include "event.hpp" +#include "liveliness_utils.hpp" +#include "message_type_support.hpp" +#include "type_support_common.hpp" + +#include "rcutils/allocator.h" + +#include "rmw/rmw.h" +#include "rmw/ret_types.h" + +namespace rmw_zenoh_cpp +{ +///============================================================================= +class PublisherData final +{ +public: + // Make a shared_ptr of PublisherData. + static std::shared_ptr make( + z_session_t session, + const rmw_node_t * const node, + liveliness::NodeInfo node_info, + std::size_t node_id, + std::size_t publisher_id, + const std::string & topic_name, + const rosidl_message_type_support_t * type_support, + const rmw_qos_profile_t * qos_profile); + + // Get a copy of the actual qos_profile used by this publisher. + rmw_qos_profile_t adapted_qos_profile() const; + + // Publish a ROS message. + rmw_ret_t publish( + const void * ros_message, + std::optional & shm_manager); + + // Get the entity from this PublisherData. + std::shared_ptr entity() const; + + // Get the next sequence number. + size_t get_next_sequence_number(); + + // Get the GID of this PublisherData. + const uint8_t * gid() const; + + // Get the Zenoh publisher. + z_publisher_t publisher() const; + + // Returns true if liveliness token is still valid. + bool liveliness_is_valid() const; + + // Get the events manager of this PublisherData. + std::shared_ptr events_mgr() const; + + // Shutdown this PublisherData. + rmw_ret_t shutdown(); + + // Check if this PublisherData is shutdown. + bool is_shutdown() const; + + // Destructor. + ~PublisherData(); + +private: + // Constructor. + PublisherData(); + + // Internal mutex. + mutable std::mutex mutex_; + // The parent node + const rmw_node_t * rmw_node_; + // The Entity generated for the publisher. + std::shared_ptr entity_; + uint8_t gid_[RMW_GID_STORAGE_SIZE]; + // An owned publisher. + z_owned_publisher_t pub_; + // Optional publication cache when durability is transient_local. + std::optional pub_cache_; + // Liveliness token for the publisher. + zc_owned_liveliness_token_t token_; + // Type support fields + const void * type_support_impl_; + std::unique_ptr type_support_; + std::shared_ptr events_mgr_; + size_t sequence_number_; + // Shutdown flag. + bool is_shutdown_; +}; +using PublisherDataPtr = std::shared_ptr; +using PublisherDataConstPtr = std::shared_ptr; +} // namespace rmw_zenoh_cpp + +#endif // DETAIL__RMW_PUBLISHER_DATA_HPP_ diff --git a/rmw_zenoh_cpp/src/rmw_event.cpp b/rmw_zenoh_cpp/src/rmw_event.cpp index 3e350542..75f03939 100644 --- a/rmw_zenoh_cpp/src/rmw_event.cpp +++ b/rmw_zenoh_cpp/src/rmw_event.cpp @@ -22,6 +22,7 @@ #include "detail/identifier.hpp" #include "detail/rmw_context_impl_s.hpp" #include "detail/rmw_data_types.hpp" +#include "detail/rmw_publisher_data.hpp" extern "C" @@ -38,14 +39,16 @@ rmw_publisher_event_init( RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(publisher->implementation_identifier, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(publisher->data, RMW_RET_INVALID_ARGUMENT); - rmw_zenoh_cpp::rmw_publisher_data_t * pub_data = - static_cast(publisher->data); - RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(pub_data->context, RMW_RET_INVALID_ARGUMENT); - rmw_context_impl_t * context_impl = static_cast(pub_data->context->impl); + rmw_node_t * node = + static_cast(publisher->data); + RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + rmw_context_impl_s * context_impl = + static_cast(node->context->impl); RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(pub_data->entity, RMW_RET_INVALID_ARGUMENT); - + auto node_data = context_impl->get_node_data(node); + RMW_CHECK_ARGUMENT_FOR_NULL(node_data, RMW_RET_INVALID_ARGUMENT); + auto pub_data = node_data->get_pub_data(publisher); + RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); if (publisher->implementation_identifier != rmw_zenoh_cpp::rmw_zenoh_identifier) { RMW_SET_ERROR_MSG("Publisher implementation identifier not from this implementation"); return RMW_RET_INCORRECT_RMW_IMPLEMENTATION; @@ -60,12 +63,14 @@ rmw_publisher_event_init( } rmw_event->implementation_identifier = publisher->implementation_identifier; - rmw_event->data = &pub_data->events_mgr; + // TODO(Yadunund): This is still subject to race conditions. Should we + // set data to context_impl which is assumed to outlast all entities? + rmw_event->data = pub_data->events_mgr().get(); rmw_event->event_type = event_type; // Register the event with graph cache. context_impl->graph_cache()->set_qos_event_callback( - pub_data->entity, + pub_data->entity(), zenoh_event_type, [pub_data, zenoh_event_type](std::unique_ptr zenoh_event) @@ -73,7 +78,7 @@ rmw_publisher_event_init( if (pub_data == nullptr) { return; } - pub_data->events_mgr.add_new_event( + pub_data->events_mgr()->add_new_event( zenoh_event_type, std::move(zenoh_event)); } @@ -164,7 +169,7 @@ rmw_event_set_callback( return RMW_RET_ERROR; } - // Both rmw_subscription_data_t and rmw_publisher_data_t store an EventsManager object. + // rmw_event->data holds a raw_ptr to the EventsManager. rmw_zenoh_cpp::EventsManager * event_data = static_cast(rmw_event->data); RMW_CHECK_ARGUMENT_FOR_NULL(event_data, RMW_RET_INVALID_ARGUMENT); diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index aa445ef4..dab5c9b1 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -18,7 +18,6 @@ #include #include -#include #include #include #include @@ -329,22 +328,6 @@ rmw_fini_publisher_allocation( return RMW_RET_UNSUPPORTED; } -namespace -{ -void -generate_random_gid(uint8_t gid[RMW_GID_STORAGE_SIZE]) -{ - std::random_device dev; - std::mt19937 rng(dev()); - std::uniform_int_distribution dist( - std::numeric_limits::min(), std::numeric_limits::max()); - - for (size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i) { - gid[i] = dist(rng); - } -} -} // namespace - //============================================================================== /// Create a publisher and return a handle to that publisher. rmw_publisher_t * @@ -416,6 +399,10 @@ rmw_create_publisher( } rcutils_allocator_t * allocator = &node->context->options.allocator; + if (!rcutils_allocator_is_valid(allocator)) { + RMW_SET_ERROR_MSG("allocator is invalid."); + return nullptr; + } // Create the publisher. auto rmw_publisher = @@ -430,69 +417,30 @@ rmw_create_publisher( allocator->deallocate(rmw_publisher, allocator->state); }); - auto publisher_data = static_cast( - allocator->allocate(sizeof(rmw_zenoh_cpp::rmw_publisher_data_t), allocator->state)); + auto node_data = context_impl->get_node_data(node); RMW_CHECK_FOR_NULL_WITH_MSG( - publisher_data, - "failed to allocate memory for publisher data", + node_data, + "NodeData not found.", return nullptr); - auto free_publisher_data = rcpputils::make_scope_exit( - [publisher_data, allocator]() { - allocator->deallocate(publisher_data, allocator->state); - }); - RMW_TRY_PLACEMENT_NEW( - publisher_data, publisher_data, return nullptr, - rmw_zenoh_cpp::rmw_publisher_data_t); - auto destruct_publisher_data = rcpputils::make_scope_exit( - [publisher_data]() { - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - publisher_data->~rmw_publisher_data_t(), rmw_zenoh_cpp::rmw_publisher_data_t); - }); - - generate_random_gid(publisher_data->pub_gid); - - // Adapt any 'best available' QoS options - publisher_data->adapted_qos_profile = *qos_profile; - rmw_ret_t ret = rmw_zenoh_cpp::QoS::get().best_available_qos( - node, topic_name, &publisher_data->adapted_qos_profile, rmw_get_subscriptions_info_by_topic); - if (RMW_RET_OK != ret) { + if (!node_data->create_pub_data( + rmw_publisher, + context_impl->session(), + context_impl->get_next_entity_id(), + topic_name, + type_support, + qos_profile)) + { + // Error already handled. return nullptr; } - publisher_data->typesupport_identifier = type_support->typesupport_identifier; - publisher_data->type_hash = type_support->get_type_hash_func(type_support); - publisher_data->type_support_impl = type_support->data; - auto callbacks = static_cast(type_support->data); - publisher_data->type_support = static_cast( - allocator->allocate(sizeof(rmw_zenoh_cpp::MessageTypeSupport), allocator->state)); - RMW_CHECK_FOR_NULL_WITH_MSG( - publisher_data->type_support, - "Failed to allocate MessageTypeSupport", - return nullptr); - auto free_type_support = rcpputils::make_scope_exit( - [publisher_data, allocator]() { - allocator->deallocate(publisher_data->type_support, allocator->state); - }); - - RMW_TRY_PLACEMENT_NEW( - publisher_data->type_support, - publisher_data->type_support, - return nullptr, - rmw_zenoh_cpp::MessageTypeSupport, callbacks); - auto destruct_msg_type_support = rcpputils::make_scope_exit( - [publisher_data]() { - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - publisher_data->type_support->~MessageTypeSupport(), - rmw_zenoh_cpp::MessageTypeSupport); - }); - - publisher_data->context = node->context; - rmw_publisher->data = publisher_data; + // Store type erased node in rmw_publisher->data so that the + // PublisherData can be safely accessed. + rmw_publisher->data = reinterpret_cast(const_cast(node)); rmw_publisher->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; rmw_publisher->options = *publisher_options; rmw_publisher->can_loan_messages = false; - rmw_publisher->topic_name = rcutils_strdup(topic_name, *allocator); RMW_CHECK_FOR_NULL_WITH_MSG( rmw_publisher->topic_name, @@ -503,145 +451,7 @@ rmw_create_publisher( allocator->deallocate(const_cast(rmw_publisher->topic_name), allocator->state); }); - // Convert the type hash to a string so that it can be included in - // the keyexpr. - char * type_hash_c_str = nullptr; - rcutils_ret_t stringify_ret = rosidl_stringify_type_hash( - publisher_data->type_hash, - *allocator, - &type_hash_c_str); - if (RCUTILS_RET_BAD_ALLOC == stringify_ret) { - RMW_SET_ERROR_MSG("Failed to allocate type_hash_c_str."); - return nullptr; - } - auto free_type_hash_c_str = rcpputils::make_scope_exit( - [&allocator, &type_hash_c_str]() { - allocator->deallocate(type_hash_c_str, allocator->state); - }); - - z_session_t session = context_impl->session(); - const z_id_t zid = z_info_zid(session); - auto node_data = context_impl->get_node_data(node); - RMW_CHECK_FOR_NULL_WITH_MSG( - node_data, - "NodeData not found.", - return nullptr); - publisher_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( - zid, - std::to_string(node_data->id()), - std::to_string( - context_impl->get_next_entity_id()), - rmw_zenoh_cpp::liveliness::EntityType::Publisher, - rmw_zenoh_cpp::liveliness::NodeInfo{ - node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave()}, - rmw_zenoh_cpp::liveliness::TopicInfo{ - node->context->actual_domain_id, - rmw_publisher->topic_name, - publisher_data->type_support->get_name(), - type_hash_c_str, - publisher_data->adapted_qos_profile} - ); - if (publisher_data->entity == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to generate keyexpr for liveliness token for the publisher %s.", - rmw_publisher->topic_name); - return nullptr; - } - z_owned_keyexpr_t keyexpr = z_keyexpr_new( - publisher_data->entity->topic_info()->topic_keyexpr_.c_str()); - auto always_free_ros_keyexpr = rcpputils::make_scope_exit( - [&keyexpr]() { - z_keyexpr_drop(z_move(keyexpr)); - }); - if (!z_keyexpr_check(&keyexpr)) { - RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); - return nullptr; - } - - // Create a Publication Cache if durability is transient_local. - if (publisher_data->adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { - ze_publication_cache_options_t pub_cache_opts = ze_publication_cache_options_default(); - pub_cache_opts.history = publisher_data->adapted_qos_profile.depth; - pub_cache_opts.queryable_complete = true; - // Set the queryable_prefix to the session id so that querying subscribers can specify this - // session id to obtain latest data from this specific publication caches when querying over - // the same keyexpression. - // When such a prefix is added to the PublicationCache, it listens to queries with this extra - // prefix (allowing to be queried in a unique way), but still replies with the original - // publications' key expressions. - z_owned_keyexpr_t queryable_prefix = z_keyexpr_new(publisher_data->entity->zid().c_str()); - auto always_free_queryable_prefix = rcpputils::make_scope_exit( - [&queryable_prefix]() { - z_keyexpr_drop(z_move(queryable_prefix)); - }); - pub_cache_opts.queryable_prefix = z_loan(queryable_prefix); - publisher_data->pub_cache = ze_declare_publication_cache( - session, - z_loan(keyexpr), - &pub_cache_opts - ); - if (!publisher_data->pub_cache.has_value() || !z_check(publisher_data->pub_cache.value())) { - RMW_SET_ERROR_MSG("unable to create zenoh publisher cache"); - return nullptr; - } - } - auto undeclare_z_publisher_cache = rcpputils::make_scope_exit( - [publisher_data]() { - if (publisher_data && publisher_data->pub_cache.has_value()) { - z_drop(z_move(publisher_data->pub_cache.value())); - } - }); - - // Set congestion_control to BLOCK if appropriate. - z_publisher_options_t opts = z_publisher_options_default(); - opts.congestion_control = Z_CONGESTION_CONTROL_DROP; - if (publisher_data->adapted_qos_profile.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL && - publisher_data->adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) - { - opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK; - } - // TODO(clalancette): What happens if the key name is a valid but empty string? - publisher_data->pub = z_declare_publisher( - session, - z_loan(keyexpr), - &opts - ); - if (!z_check(publisher_data->pub)) { - RMW_SET_ERROR_MSG("unable to create zenoh publisher"); - return nullptr; - } - auto undeclare_z_publisher = rcpputils::make_scope_exit( - [publisher_data]() { - z_undeclare_publisher(z_move(publisher_data->pub)); - }); - - publisher_data->token = zc_liveliness_declare_token( - session, - z_keyexpr(publisher_data->entity->liveliness_keyexpr().c_str()), - NULL - ); - auto free_token = rcpputils::make_scope_exit( - [publisher_data]() { - if (publisher_data != nullptr) { - z_drop(z_move(publisher_data->token)); - } - }); - if (!z_check(publisher_data->token)) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to create liveliness token for the publisher."); - return nullptr; - } - - free_token.cancel(); - undeclare_z_publisher_cache.cancel(); - undeclare_z_publisher.cancel(); free_topic_name.cancel(); - destruct_msg_type_support.cancel(); - free_type_support.cancel(); - destruct_publisher_data.cancel(); - free_publisher_data.cancel(); free_rmw_publisher.cancel(); return rmw_publisher; @@ -658,7 +468,6 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) rmw_context_impl_s * context_impl = static_cast(node->context->impl); RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(publisher->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, node->implementation_identifier, @@ -669,34 +478,23 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) publisher->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - - rmw_ret_t ret = RMW_RET_OK; + auto node_data = context_impl->get_node_data(node); + if (node_data == nullptr) { + return RMW_RET_INVALID_ARGUMENT; + } + auto pub_data = node_data->get_pub_data(publisher); + if (pub_data == nullptr) { + return RMW_RET_INVALID_ARGUMENT; + } + // Remove any event callbacks registered to this publisher. + context_impl->graph_cache()->remove_qos_event_callbacks(pub_data->entity()); + // Remove the PublisherData from NodeData. + node_data->delete_pub_data(publisher); rcutils_allocator_t * allocator = &node->context->options.allocator; - - auto publisher_data = static_cast(publisher->data); - if (publisher_data != nullptr) { - zc_liveliness_undeclare_token(z_move(publisher_data->token)); - if (publisher_data->pub_cache.has_value()) { - z_drop(z_move(publisher_data->pub_cache.value())); - } - RMW_TRY_DESTRUCTOR(publisher_data->type_support->~MessageTypeSupport(), MessageTypeSupport, ); - allocator->deallocate(publisher_data->type_support, allocator->state); - if (z_undeclare_publisher(z_move(publisher_data->pub))) { - RMW_SET_ERROR_MSG("failed to undeclare pub"); - ret = RMW_RET_ERROR; - } - - // Remove any event callbacks registered to this publisher. - context_impl->graph_cache()->remove_qos_event_callbacks(publisher_data->entity); - - RMW_TRY_DESTRUCTOR(publisher_data->~rmw_publisher_data_t(), rmw_publisher_data_t, ); - allocator->deallocate(publisher_data, allocator->state); - } allocator->deallocate(const_cast(publisher->topic_name), allocator->state); allocator->deallocate(publisher, allocator->state); - - return ret; + return RMW_RET_OK; } //============================================================================== @@ -773,7 +571,7 @@ rmw_return_loaned_message_from_publisher( namespace { z_owned_bytes_map_t -create_map_and_set_sequence_num(int64_t sequence_number, uint8_t gid[RMW_GID_STORAGE_SIZE]) +create_map_and_set_sequence_num(int64_t sequence_number, const uint8_t gid[RMW_GID_STORAGE_SIZE]) { z_owned_bytes_map_t map = z_bytes_map_new(); if (!z_check(map)) { @@ -833,113 +631,26 @@ rmw_publish( RMW_CHECK_FOR_NULL_WITH_MSG( ros_message, "ros message handle is null", return RMW_RET_INVALID_ARGUMENT); - - auto publisher_data = static_cast(publisher->data); + rmw_node_t * rmw_node = static_cast(publisher->data); RMW_CHECK_FOR_NULL_WITH_MSG( - publisher_data, "publisher_data is null", + rmw_node, "publisher_data is null", return RMW_RET_INVALID_ARGUMENT); - - rcutils_allocator_t * allocator = &(publisher_data->context->options.allocator); - - // Serialize data. - size_t max_data_length = publisher_data->type_support->get_estimated_serialized_size( - ros_message, - publisher_data->type_support_impl); - - // To store serialized message byte array. - char * msg_bytes = nullptr; - std::optional shmbuf = std::nullopt; - auto always_free_shmbuf = rcpputils::make_scope_exit( - [&shmbuf]() { - if (shmbuf.has_value()) { - zc_shmbuf_drop(&shmbuf.value()); - } - }); - auto free_msg_bytes = rcpputils::make_scope_exit( - [&msg_bytes, allocator, &shmbuf]() { - if (msg_bytes && !shmbuf.has_value()) { - allocator->deallocate(msg_bytes, allocator->state); - } - }); - - // Get memory from SHM buffer if available. - if (publisher_data->context->impl->shm_manager().has_value() && - zc_shm_manager_check(&publisher_data->context->impl->shm_manager().value())) - { - shmbuf = zc_shm_alloc( - &publisher_data->context->impl->shm_manager().value(), - max_data_length); - if (!z_check(shmbuf.value())) { - zc_shm_gc(&publisher_data->context->impl->shm_manager().value()); - shmbuf = zc_shm_alloc(&publisher_data->context->impl->shm_manager().value(), max_data_length); - if (!z_check(shmbuf.value())) { - // TODO(Yadunund): Should we revert to regular allocation and not return an error? - RMW_SET_ERROR_MSG("Failed to allocate a SHM buffer, even after GCing"); - return RMW_RET_ERROR; - } - } - msg_bytes = reinterpret_cast(zc_shmbuf_ptr(&shmbuf.value())); - } else { - // Get memory from the allocator. - msg_bytes = static_cast(allocator->allocate(max_data_length, allocator->state)); - RMW_CHECK_FOR_NULL_WITH_MSG( - msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC); - } - - // Object that manages the raw buffer - eprosima::fastcdr::FastBuffer fastbuffer(msg_bytes, max_data_length); - - // Object that serializes the data - rmw_zenoh_cpp::Cdr ser(fastbuffer); - if (!publisher_data->type_support->serialize_ros_message( - ros_message, - ser.get_cdr(), - publisher_data->type_support_impl)) - { - RMW_SET_ERROR_MSG("could not serialize ROS message"); - return RMW_RET_ERROR; - } - - const size_t data_length = ser.get_serialized_data_length(); - - int64_t sequence_number = publisher_data->get_next_sequence_number(); - - z_owned_bytes_map_t map = - create_map_and_set_sequence_num(sequence_number, publisher_data->pub_gid); - if (!z_check(map)) { - // create_map_and_set_sequence_num already set the error - return RMW_RET_ERROR; - } - auto free_attachment_map = rcpputils::make_scope_exit( - [&map]() { - z_bytes_map_drop(z_move(map)); - }); - - int ret; - // The encoding is simply forwarded and is useful when key expressions in the - // session use different encoding formats. In our case, all key expressions - // will be encoded with CDR so it does not really matter. - z_publisher_put_options_t options = z_publisher_put_options_default(); - options.attachment = z_bytes_map_as_attachment(&map); - - if (shmbuf.has_value()) { - zc_shmbuf_set_length(&shmbuf.value(), data_length); - zc_owned_payload_t payload = zc_shmbuf_into_payload(z_move(shmbuf.value())); - ret = zc_publisher_put_owned(z_loan(publisher_data->pub), z_move(payload), &options); - } else { - // Returns 0 if success. - ret = z_publisher_put( - z_loan(publisher_data->pub), - reinterpret_cast(msg_bytes), - data_length, - &options); + rmw_context_impl_s * context_impl = static_cast(rmw_node->context->impl); + RMW_CHECK_FOR_NULL_WITH_MSG( + context_impl, "context_impl is null", + return RMW_RET_INVALID_ARGUMENT); + auto node_data = context_impl->get_node_data(rmw_node); + if (node_data == nullptr) { + return RMW_RET_INVALID_ARGUMENT; } - if (ret) { - RMW_SET_ERROR_MSG("unable to publish message"); - return RMW_RET_ERROR; + auto pub_data = node_data->get_pub_data(publisher); + if (pub_data == nullptr) { + return RMW_RET_INVALID_ARGUMENT; } - return RMW_RET_OK; + return pub_data->publish( + ros_message, + context_impl->shm_manager()); } //============================================================================== @@ -971,16 +682,19 @@ rmw_publisher_count_matched_subscriptions( rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(subscription_count, RMW_RET_INVALID_ARGUMENT); - - rmw_zenoh_cpp::rmw_publisher_data_t * pub_data = - static_cast(publisher->data); - RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(pub_data->context, RMW_RET_INVALID_ARGUMENT); - rmw_context_impl_t * context_impl = static_cast(pub_data->context->impl); + rmw_node_t * node = + static_cast(publisher->data); + RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + rmw_context_impl_s * context_impl = + static_cast(node->context->impl); RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); + auto node_data = context_impl->get_node_data(node); + RMW_CHECK_ARGUMENT_FOR_NULL(node_data, RMW_RET_INVALID_ARGUMENT); + auto pub_data = node_data->get_pub_data(publisher); + RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); return context_impl->graph_cache()->publisher_count_matched_subscriptions( - publisher, subscription_count); + pub_data, subscription_count); } //============================================================================== @@ -997,12 +711,18 @@ rmw_publisher_get_actual_qos( rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT); - - rmw_zenoh_cpp::rmw_publisher_data_t * pub_data = - static_cast(publisher->data); + rmw_node_t * node = + static_cast(publisher->data); + RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + rmw_context_impl_s * context_impl = + static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); + auto node_data = context_impl->get_node_data(node); + RMW_CHECK_ARGUMENT_FOR_NULL(node_data, RMW_RET_INVALID_ARGUMENT); + auto pub_data = node_data->get_pub_data(publisher); RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); - *qos = pub_data->adapted_qos_profile; + *qos = pub_data->adapted_qos_profile(); return RMW_RET_OK; } @@ -1024,11 +744,16 @@ rmw_publish_serialized_message( RMW_CHECK_FOR_NULL_WITH_MSG( serialized_message, "serialized message handle is null", return RMW_RET_INVALID_ARGUMENT); - - auto publisher_data = static_cast(publisher->data); - RCUTILS_CHECK_FOR_NULL_WITH_MSG( - publisher_data, "publisher data pointer is null", - return RMW_RET_ERROR); + rmw_node_t * node = + static_cast(publisher->data); + RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + rmw_context_impl_s * context_impl = + static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); + auto node_data = context_impl->get_node_data(node); + RMW_CHECK_ARGUMENT_FOR_NULL(node_data, RMW_RET_INVALID_ARGUMENT); + auto publisher_data = node_data->get_pub_data(publisher); + RMW_CHECK_ARGUMENT_FOR_NULL(publisher_data, RMW_RET_INVALID_ARGUMENT); eprosima::fastcdr::FastBuffer buffer( reinterpret_cast(serialized_message->buffer), serialized_message->buffer_length); @@ -1041,7 +766,7 @@ rmw_publish_serialized_message( uint64_t sequence_number = publisher_data->get_next_sequence_number(); z_owned_bytes_map_t map = - create_map_and_set_sequence_num(sequence_number, publisher_data->pub_gid); + create_map_and_set_sequence_num(sequence_number, publisher_data->gid()); if (!z_check(map)) { // create_map_and_set_sequence_num already set the error @@ -1061,7 +786,7 @@ rmw_publish_serialized_message( options.attachment = z_bytes_map_as_attachment(&map); // Returns 0 if success. int8_t ret = z_publisher_put( - z_loan(publisher_data->pub), + publisher_data->publisher(), serialized_message->buffer, data_length, &options); @@ -1099,11 +824,18 @@ rmw_publisher_assert_liveliness(const rmw_publisher_t * publisher) RMW_CHECK_FOR_NULL_WITH_MSG( publisher->data, "publisher data is null", return RMW_RET_INVALID_ARGUMENT); - rmw_zenoh_cpp::rmw_publisher_data_t * pub_data = - static_cast(publisher->data); + rmw_node_t * node = + static_cast(publisher->data); + RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + rmw_context_impl_s * context_impl = + static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); + auto node_data = context_impl->get_node_data(node); + RMW_CHECK_ARGUMENT_FOR_NULL(node_data, RMW_RET_INVALID_ARGUMENT); + auto pub_data = node_data->get_pub_data(publisher); RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); - if (!zc_liveliness_token_check(&pub_data->token)) { + if (!pub_data->liveliness_is_valid()) { return RMW_RET_ERROR; } @@ -2114,7 +1846,7 @@ rmw_create_client( rmw_zenoh_cpp::rmw_client_data_t); }); - generate_random_gid(client_data->client_gid); + rmw_zenoh_cpp::generate_random_gid(client_data->client_gid); // Adapt any 'best available' QoS options client_data->adapted_qos_profile = *qos_profile; @@ -3775,13 +3507,19 @@ rmw_get_gid_for_publisher(const rmw_publisher_t * publisher, rmw_gid_t * gid) { RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(gid, RMW_RET_INVALID_ARGUMENT); - - rmw_zenoh_cpp::rmw_publisher_data_t * pub_data = - static_cast(publisher->data); + rmw_node_t * node = + static_cast(publisher->data); + RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + rmw_context_impl_s * context_impl = + static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); + auto node_data = context_impl->get_node_data(node); + RMW_CHECK_ARGUMENT_FOR_NULL(node_data, RMW_RET_INVALID_ARGUMENT); + auto pub_data = node_data->get_pub_data(publisher); RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); gid->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - memcpy(gid->data, pub_data->pub_gid, RMW_GID_STORAGE_SIZE); + memcpy(gid->data, pub_data->gid(), RMW_GID_STORAGE_SIZE); return RMW_RET_OK; }