From 6f0186ddfa4f72391c9df096b03431a8a09f3530 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Tue, 1 Oct 2024 05:27:45 +0800 Subject: [PATCH] Reuse methods and make publish_serialized_message also thread safe Signed-off-by: Yadunund --- rmw_zenoh_cpp/CMakeLists.txt | 1 + .../src/detail/rmw_publisher_data.cpp | 111 ++++++++---------- .../src/detail/rmw_publisher_data.hpp | 9 +- rmw_zenoh_cpp/src/detail/zenoh_utils.cpp | 68 +++++++++++ rmw_zenoh_cpp/src/detail/zenoh_utils.hpp | 30 +++++ rmw_zenoh_cpp/src/rmw_zenoh.cpp | 96 ++------------- 6 files changed, 161 insertions(+), 154 deletions(-) create mode 100644 rmw_zenoh_cpp/src/detail/zenoh_utils.cpp create mode 100644 rmw_zenoh_cpp/src/detail/zenoh_utils.hpp diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index 631bf6cb..bd274a70 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -47,6 +47,7 @@ add_library(rmw_zenoh_cpp SHARED src/detail/type_support_common.cpp src/detail/zenoh_config.cpp src/detail/zenoh_router_check.cpp + src/detail/zenoh_utils.cpp src/rmw_event.cpp src/rmw_get_network_flow_endpoints.cpp src/rmw_get_node_info_and_types.cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index 92c17952..c598b83f 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -27,6 +27,7 @@ #include "message_type_support.hpp" #include "logging_macros.hpp" #include "qos.hpp" +#include "zenoh_utils.hpp" #include "rcpputils/scope_exit.hpp" @@ -36,50 +37,6 @@ namespace rmw_zenoh_cpp { -namespace -{ -z_owned_bytes_map_t -create_map_and_set_sequence_num(int64_t sequence_number, uint8_t gid[RMW_GID_STORAGE_SIZE]) -{ - z_owned_bytes_map_t map = z_bytes_map_new(); - if (!z_check(map)) { - RMW_SET_ERROR_MSG("failed to allocate map for sequence number"); - return z_bytes_map_null(); - } - auto free_attachment_map = rcpputils::make_scope_exit( - [&map]() { - z_bytes_map_drop(z_move(map)); - }); - - // The largest possible int64_t number is INT64_MAX, i.e. 9223372036854775807. - // That is 19 characters long, plus one for the trailing \0, means we need 20 bytes. - char seq_id_str[20]; - if (rcutils_snprintf(seq_id_str, sizeof(seq_id_str), "%" PRId64, sequence_number) < 0) { - RMW_SET_ERROR_MSG("failed to print sequence_number into buffer"); - return z_bytes_map_null(); - } - z_bytes_map_insert_by_copy(&map, z_bytes_new("sequence_number"), z_bytes_new(seq_id_str)); - - auto now = std::chrono::system_clock::now().time_since_epoch(); - auto now_ns = std::chrono::duration_cast(now); - char source_ts_str[20]; - if (rcutils_snprintf(source_ts_str, sizeof(source_ts_str), "%" PRId64, now_ns.count()) < 0) { - RMW_SET_ERROR_MSG("failed to print sequence_number into buffer"); - return z_bytes_map_null(); - } - z_bytes_map_insert_by_copy(&map, z_bytes_new("source_timestamp"), z_bytes_new(source_ts_str)); - - z_bytes_t gid_bytes; - gid_bytes.len = RMW_GID_STORAGE_SIZE; - gid_bytes.start = gid; - - z_bytes_map_insert_by_copy(&map, z_bytes_new("source_gid"), gid_bytes); - - free_attachment_map.cancel(); - - return map; -} -} // namespace ///============================================================================= std::shared_ptr PublisherData::make( z_session_t session, @@ -332,10 +289,8 @@ rmw_ret_t PublisherData::publish( const size_t data_length = ser.get_serialized_data_length(); - const int64_t sequence_number = sequence_number_++; - z_owned_bytes_map_t map = - create_map_and_set_sequence_num(sequence_number, gid_); + create_map_and_set_sequence_num(sequence_number_++, gid_); if (!z_check(map)) { // create_map_and_set_sequence_num already set the error return RMW_RET_ERROR; @@ -373,24 +328,67 @@ rmw_ret_t PublisherData::publish( } ///============================================================================= -std::size_t PublisherData::guid() const +rmw_ret_t PublisherData::publish_serialized_message( + const rmw_serialized_message_t * serialized_message, + std::optional & /*shm_manager*/) { + eprosima::fastcdr::FastBuffer buffer( + reinterpret_cast(serialized_message->buffer), serialized_message->buffer_length); + rmw_zenoh_cpp::Cdr ser(buffer); + if (!ser.get_cdr().jump(serialized_message->buffer_length)) { + RMW_SET_ERROR_MSG("cannot correctly set serialized buffer"); + return RMW_RET_ERROR; + } + std::lock_guard lock(mutex_); - return entity_->guid(); + + z_owned_bytes_map_t map = + rmw_zenoh_cpp::create_map_and_set_sequence_num(sequence_number_++, gid_); + + if (!z_check(map)) { + // create_map_and_set_sequence_num already set the error + return RMW_RET_ERROR; + } + auto free_attachment_map = rcpputils::make_scope_exit( + [&map]() { + z_bytes_map_drop(z_move(map)); + }); + + const size_t data_length = ser.get_serialized_data_length(); + + // The encoding is simply forwarded and is useful when key expressions in the + // session use different encoding formats. In our case, all key expressions + // will be encoded with CDR so it does not really matter. + z_publisher_put_options_t options = z_publisher_put_options_default(); + options.attachment = z_bytes_map_as_attachment(&map); + + // Returns 0 if success. + int8_t ret = z_publisher_put( + z_loan(pub_), + serialized_message->buffer, + data_length, + &options); + + if (ret) { + RMW_SET_ERROR_MSG("unable to publish message"); + return RMW_RET_ERROR; + } + + return RMW_RET_OK; } ///============================================================================= -liveliness::TopicInfo PublisherData::topic_info() const +std::size_t PublisherData::guid() const { std::lock_guard lock(mutex_); - return entity_->topic_info().value(); + return entity_->guid(); } ///============================================================================= -size_t PublisherData::get_next_sequence_number() +liveliness::TopicInfo PublisherData::topic_info() const { std::lock_guard lock(mutex_); - return sequence_number_++; + return entity_->topic_info().value(); } ///============================================================================= @@ -400,13 +398,6 @@ const uint8_t * PublisherData::gid() const return gid_; } -///============================================================================= -z_publisher_t PublisherData::publisher() const -{ - std::lock_guard lock(mutex_); - return z_loan(pub_); -} - ///============================================================================= bool PublisherData::liveliness_is_valid() const { diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp index 9a2dfda9..e3fc4fc2 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp @@ -59,8 +59,10 @@ class PublisherData final const void * ros_message, std::optional & shm_manager); - // Get the next sequence number. - size_t get_next_sequence_number(); + // Publish a serialized ROS message. + rmw_ret_t publish_serialized_message( + const rmw_serialized_message_t * serialized_message, + std::optional & shm_manager); // Get a copy of the GUID of this PublisherData's liveliness::Entity. std::size_t guid() const; @@ -71,9 +73,6 @@ class PublisherData final // Get the GID of this PublisherData. const uint8_t * gid() const; - // Get the Zenoh publisher. - z_publisher_t publisher() const; - // Returns true if liveliness token is still valid. bool liveliness_is_valid() const; diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp new file mode 100644 index 00000000..1186d28c --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp @@ -0,0 +1,68 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "zenoh_utils.hpp" + +#include +#include + +#include "rcpputils/scope_exit.hpp" + +#include "rmw/error_handling.h" + +namespace rmw_zenoh_cpp +{ +///============================================================================= +z_owned_bytes_map_t +create_map_and_set_sequence_num(int64_t sequence_number, const uint8_t gid[RMW_GID_STORAGE_SIZE]) +{ + z_owned_bytes_map_t map = z_bytes_map_new(); + if (!z_check(map)) { + RMW_SET_ERROR_MSG("failed to allocate map for sequence number"); + return z_bytes_map_null(); + } + auto free_attachment_map = rcpputils::make_scope_exit( + [&map]() { + z_bytes_map_drop(z_move(map)); + }); + + // The largest possible int64_t number is INT64_MAX, i.e. 9223372036854775807. + // That is 19 characters long, plus one for the trailing \0, means we need 20 bytes. + char seq_id_str[20]; + if (rcutils_snprintf(seq_id_str, sizeof(seq_id_str), "%" PRId64, sequence_number) < 0) { + RMW_SET_ERROR_MSG("failed to print sequence_number into buffer"); + return z_bytes_map_null(); + } + z_bytes_map_insert_by_copy(&map, z_bytes_new("sequence_number"), z_bytes_new(seq_id_str)); + + auto now = std::chrono::system_clock::now().time_since_epoch(); + auto now_ns = std::chrono::duration_cast(now); + char source_ts_str[20]; + if (rcutils_snprintf(source_ts_str, sizeof(source_ts_str), "%" PRId64, now_ns.count()) < 0) { + RMW_SET_ERROR_MSG("failed to print sequence_number into buffer"); + return z_bytes_map_null(); + } + z_bytes_map_insert_by_copy(&map, z_bytes_new("source_timestamp"), z_bytes_new(source_ts_str)); + + z_bytes_t gid_bytes; + gid_bytes.len = RMW_GID_STORAGE_SIZE; + gid_bytes.start = gid; + + z_bytes_map_insert_by_copy(&map, z_bytes_new("source_gid"), gid_bytes); + + free_attachment_map.cancel(); + + return map; +} +} // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp new file mode 100644 index 00000000..f3ec5206 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp @@ -0,0 +1,30 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__ZENOH_UTILS_HPP_ +#define DETAIL__ZENOH_UTILS_HPP_ + +#include + +#include "rmw/types.h" + +namespace rmw_zenoh_cpp +{ +///============================================================================= +z_owned_bytes_map_t +create_map_and_set_sequence_num(int64_t sequence_number, const uint8_t gid[RMW_GID_STORAGE_SIZE]); + +} // namespace rmw_zenoh_cpp + +#endif // DETAIL__ZENOH_UTILS_HPP_ diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index a33065d2..2b9a0938 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -39,6 +39,7 @@ #include "detail/rmw_data_types.hpp" #include "detail/serialization_format.hpp" #include "detail/type_support_common.hpp" +#include "detail/zenoh_utils.hpp" #include "rcpputils/scope_exit.hpp" @@ -568,51 +569,6 @@ rmw_return_loaned_message_from_publisher( return RMW_RET_UNSUPPORTED; } -namespace -{ -z_owned_bytes_map_t -create_map_and_set_sequence_num(int64_t sequence_number, const uint8_t gid[RMW_GID_STORAGE_SIZE]) -{ - z_owned_bytes_map_t map = z_bytes_map_new(); - if (!z_check(map)) { - RMW_SET_ERROR_MSG("failed to allocate map for sequence number"); - return z_bytes_map_null(); - } - auto free_attachment_map = rcpputils::make_scope_exit( - [&map]() { - z_bytes_map_drop(z_move(map)); - }); - - // The largest possible int64_t number is INT64_MAX, i.e. 9223372036854775807. - // That is 19 characters long, plus one for the trailing \0, means we need 20 bytes. - char seq_id_str[20]; - if (rcutils_snprintf(seq_id_str, sizeof(seq_id_str), "%" PRId64, sequence_number) < 0) { - RMW_SET_ERROR_MSG("failed to print sequence_number into buffer"); - return z_bytes_map_null(); - } - z_bytes_map_insert_by_copy(&map, z_bytes_new("sequence_number"), z_bytes_new(seq_id_str)); - - auto now = std::chrono::system_clock::now().time_since_epoch(); - auto now_ns = std::chrono::duration_cast(now); - char source_ts_str[20]; - if (rcutils_snprintf(source_ts_str, sizeof(source_ts_str), "%" PRId64, now_ns.count()) < 0) { - RMW_SET_ERROR_MSG("failed to print sequence_number into buffer"); - return z_bytes_map_null(); - } - z_bytes_map_insert_by_copy(&map, z_bytes_new("source_timestamp"), z_bytes_new(source_ts_str)); - - z_bytes_t gid_bytes; - gid_bytes.len = RMW_GID_STORAGE_SIZE; - gid_bytes.start = gid; - - z_bytes_map_insert_by_copy(&map, z_bytes_new("source_gid"), gid_bytes); - - free_attachment_map.cancel(); - - return map; -} -} // namespace - //============================================================================== /// Publish a ROS message. rmw_ret_t @@ -755,48 +711,9 @@ rmw_publish_serialized_message( auto publisher_data = node_data->get_pub_data(publisher); RMW_CHECK_ARGUMENT_FOR_NULL(publisher_data, RMW_RET_INVALID_ARGUMENT); - eprosima::fastcdr::FastBuffer buffer( - reinterpret_cast(serialized_message->buffer), serialized_message->buffer_length); - rmw_zenoh_cpp::Cdr ser(buffer); - if (!ser.get_cdr().jump(serialized_message->buffer_length)) { - RMW_SET_ERROR_MSG("cannot correctly set serialized buffer"); - return RMW_RET_ERROR; - } - - uint64_t sequence_number = publisher_data->get_next_sequence_number(); - - z_owned_bytes_map_t map = - create_map_and_set_sequence_num(sequence_number, publisher_data->gid()); - - if (!z_check(map)) { - // create_map_and_set_sequence_num already set the error - return RMW_RET_ERROR; - } - auto free_attachment_map = rcpputils::make_scope_exit( - [&map]() { - z_bytes_map_drop(z_move(map)); - }); - - const size_t data_length = ser.get_serialized_data_length(); - - // The encoding is simply forwarded and is useful when key expressions in the - // session use different encoding formats. In our case, all key expressions - // will be encoded with CDR so it does not really matter. - z_publisher_put_options_t options = z_publisher_put_options_default(); - options.attachment = z_bytes_map_as_attachment(&map); - // Returns 0 if success. - int8_t ret = z_publisher_put( - publisher_data->publisher(), - serialized_message->buffer, - data_length, - &options); - - if (ret) { - RMW_SET_ERROR_MSG("unable to publish message"); - return RMW_RET_ERROR; - } - - return RMW_RET_OK; + return publisher_data->publish_serialized_message( + serialized_message, + context_impl->shm_manager()); } //============================================================================== @@ -2170,7 +2087,8 @@ rmw_send_request( // Send request z_get_options_t opts = z_get_options_default(); - z_owned_bytes_map_t map = create_map_and_set_sequence_num(*sequence_id, client_data->client_gid); + z_owned_bytes_map_t map = rmw_zenoh_cpp::create_map_and_set_sequence_num(*sequence_id, + client_data->client_gid); if (!z_check(map)) { // create_map_and_set_sequence_num already set the error return RMW_RET_ERROR; @@ -2861,7 +2779,7 @@ rmw_send_response( const z_query_t loaned_query = query->get_query(); z_query_reply_options_t options = z_query_reply_options_default(); - z_owned_bytes_map_t map = create_map_and_set_sequence_num( + z_owned_bytes_map_t map = rmw_zenoh_cpp::create_map_and_set_sequence_num( request_header->sequence_number, request_header->writer_guid); if (!z_check(map)) { // create_map_and_set_sequence_num already set the error