From cf03d9c257ec3df2adeebfe628fba45c6af3fdfe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Hern=C3=A1ndez=20Cordero?= Date: Wed, 20 Nov 2024 13:52:22 +0100 Subject: [PATCH] Publisher MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Alejandro Hernández Cordero --- .../src/detail/attachment_helpers.cpp | 32 +++++ .../src/detail/attachment_helpers.hpp | 20 ++- .../src/detail/rmw_publisher_data.cpp | 127 +++++++----------- .../src/detail/rmw_publisher_data.hpp | 8 +- rmw_zenoh_cpp/src/detail/zenoh_utils.cpp | 12 ++ rmw_zenoh_cpp/src/detail/zenoh_utils.hpp | 6 +- 6 files changed, 121 insertions(+), 84 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp b/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp index cbdcfa1b..363f7f55 100644 --- a/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp +++ b/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp @@ -27,6 +27,38 @@ namespace rmw_zenoh_cpp { +AttachementData::AttachementData( + const int64_t _sequence_number, + const int64_t _source_timestamp, + const uint8_t _source_gid[RMW_GID_STORAGE_SIZE]) +{ + sequence_number = _sequence_number; + source_timestamp = _source_timestamp; + for (size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i) + { + source_gid.push_back(_source_gid[RMW_GID_STORAGE_SIZE - 1 - i]); + } +} + +AttachementData::AttachementData(AttachementData && data) +{ + sequence_number = std::move(data.sequence_number); + source_timestamp = std::move(data.source_timestamp); + source_gid = data.source_gid; +} + +zenoh::Bytes AttachementData::serialize_to_zbytes() +{ + auto serializer = zenoh::ext::Serializer(); + serializer.serialize(std::string("sequence_number")); + serializer.serialize(this->sequence_number); + serializer.serialize(std::string("source_timestamp")); + serializer.serialize(this->source_timestamp); + serializer.serialize(std::string("source_gid")); + serializer.serialize(this->source_gid); + return std::move(serializer).finish(); +} + attachement_data_t::attachement_data_t( const int64_t _sequence_number, const int64_t _source_timestamp, diff --git a/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp b/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp index 2648b667..3274318a 100644 --- a/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp +++ b/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp @@ -15,13 +15,31 @@ #ifndef DETAIL__ATTACHMENT_HELPERS_HPP_ #define DETAIL__ATTACHMENT_HELPERS_HPP_ -#include +#include #include "rmw/types.h" namespace rmw_zenoh_cpp { +class AttachementData final +{ +public: + explicit AttachementData( + const int64_t _sequence_number, + const int64_t _source_timestamp, + const uint8_t _source_gid[RMW_GID_STORAGE_SIZE]); + + // explicit AttachementData(const zenoh::Bytes & bytes); + explicit AttachementData(AttachementData && data); + + int64_t sequence_number; + int64_t source_timestamp; + std::vector source_gid; + + zenoh::Bytes serialize_to_zbytes(); +}; + class attachement_data_t final { public: diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index f6d63d63..b45e552f 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -99,52 +99,34 @@ std::shared_ptr PublisherData::make( return nullptr; } - std::string topic_keyexpr = entity->topic_info()->topic_keyexpr_; - z_view_keyexpr_t pub_ke; - if (z_view_keyexpr_from_str(&pub_ke, topic_keyexpr.c_str()) != Z_OK) { - RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); - return nullptr; - } - + zenoh::ZResult err; + std::optional pub_cache; + zenoh::KeyExpr pub_ke(entity->topic_info()->topic_keyexpr_); // Create a Publication Cache if durability is transient_local. - std::optional pub_cache = std::nullopt; - auto undeclare_z_publisher_cache = rcpputils::make_scope_exit( - [&pub_cache]() { - if (pub_cache.has_value()) { - z_drop(z_move(pub_cache.value())); - } - }); if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { - ze_publication_cache_options_t pub_cache_opts; - ze_publication_cache_options_default(&pub_cache_opts); + + zenoh::Session::PublicationCacheOptions pub_cache_opts = + zenoh::Session::PublicationCacheOptions::create_default(); + pub_cache_opts.history = adapted_qos_profile.depth; pub_cache_opts.queryable_complete = true; - // Set the queryable_prefix to the session id so that querying subscribers can specify this - // session id to obtain latest data from this specific publication caches when querying over - // the same keyexpression. - // When such a prefix is added to the PublicationCache, it listens to queries with this extra - // prefix (allowing to be queried in a unique way), but still replies with the original - // publications' key expressions. + std::string queryable_prefix = entity->zid(); - z_view_keyexpr_t prefix_ke; - z_view_keyexpr_from_str(&prefix_ke, queryable_prefix.c_str()); - pub_cache_opts.queryable_prefix = z_loan(prefix_ke); + pub_cache_opts.queryable_prefix = zenoh::KeyExpr(queryable_prefix); + + pub_cache = session->declare_publication_cache(pub_ke, std::move(pub_cache_opts), &err); - ze_owned_publication_cache_t pub_cache_; - if (ze_declare_publication_cache( - z_loan(session->_0), &pub_cache_, z_loan(pub_ke), &pub_cache_opts)) + if (err != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh publisher cache"); return nullptr; } - pub_cache = pub_cache_; } // Set congestion_control to BLOCK if appropriate. - z_publisher_options_t opts; - z_publisher_options_default(&opts); - opts.congestion_control = Z_CONGESTION_CONTROL_DROP; + zenoh::Session::PublisherOptions opts = zenoh::Session::PublisherOptions::create_default(); + opts.congestion_control = Z_CONGESTION_CONTROL_DROP; if (adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { opts.reliability = Z_RELIABILITY_RELIABLE; @@ -152,21 +134,15 @@ std::shared_ptr PublisherData::make( opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK; } } - z_owned_publisher_t pub; - // TODO(clalancette): What happens if the key name is a valid but empty string? - auto undeclare_z_publisher = rcpputils::make_scope_exit( - [&pub]() { - z_undeclare_publisher(z_move(pub)); - }); - if (z_declare_publisher( - z_loan(session->_0), &pub, z_loan(pub_ke), &opts) != Z_OK) + auto pub = session->declare_publisher(pub_ke, std::move(opts), &err); + + if (err != Z_OK) { RMW_SET_ERROR_MSG("Unable to create Zenoh publisher."); return nullptr; } std::string liveliness_keyexpr = entity->liveliness_keyexpr(); - zenoh::ZResult err; auto token = session->liveliness_declare_token( zenoh::KeyExpr(liveliness_keyexpr), zenoh::Session::LivelinessDeclarationOptions::create_default(), @@ -179,9 +155,6 @@ std::shared_ptr PublisherData::make( return nullptr; } - undeclare_z_publisher_cache.cancel(); - undeclare_z_publisher.cancel(); - return std::shared_ptr( new PublisherData{ node, @@ -198,8 +171,8 @@ std::shared_ptr PublisherData::make( PublisherData::PublisherData( const rmw_node_t * rmw_node, std::shared_ptr entity, - z_owned_publisher_t pub, - std::optional pub_cache, + zenoh::Publisher pub, + std::optional pub_cache, zenoh::LivelinessToken token, const void * type_support_impl, std::unique_ptr type_support) @@ -295,24 +268,21 @@ rmw_ret_t PublisherData::publish( // The encoding is simply forwarded and is useful when key expressions in the // session use different encoding formats. In our case, all key expressions // will be encoded with CDR so it does not really matter. - z_publisher_put_options_t options; - z_publisher_put_options_default(&options); - z_owned_bytes_t attachment; uint8_t local_gid[RMW_GID_STORAGE_SIZE]; entity_->copy_gid(local_gid); - create_map_and_set_sequence_num(&attachment, sequence_number_++, local_gid); - options.attachment = z_move(attachment); - - z_owned_bytes_t payload; - if (shmbuf.has_value()) { - z_bytes_from_shm_mut(&payload, z_move(shmbuf.value())); - } else { - z_bytes_copy_from_buf(&payload, reinterpret_cast(msg_bytes), data_length); - } - - z_result_t res = z_publisher_put(z_loan(pub_), z_move(payload), &options); - if (res != Z_OK) { - if (res == Z_ESESSION_CLOSED) { + zenoh::ZResult err; + auto options = zenoh::Publisher::PutOptions::create_default(); + options.attachment = create_map_and_set_sequence_num(sequence_number_++, local_gid); + + // TODO(ahcorde): shmbuf + std::vector raw_image( + reinterpret_cast(msg_bytes), + reinterpret_cast(msg_bytes) + data_length); + zenoh::Bytes payload(raw_image); + + pub_.put(std::move(payload), std::move(options), &err); + if (err != Z_OK) { + if (err == Z_ESESSION_CLOSED) { RMW_ZENOH_LOG_WARN_NAMED( "rmw_zenoh_cpp", "unable to publish message since the zenoh session is closed"); @@ -340,27 +310,25 @@ rmw_ret_t PublisherData::publish_serialized_message( std::lock_guard lock(mutex_); - const size_t data_length = ser.get_serialized_data_length(); // The encoding is simply forwarded and is useful when key expressions in the // session use different encoding formats. In our case, all key expressions // will be encoded with CDR so it does not really matter. - z_publisher_put_options_t options; - z_publisher_put_options_default(&options); uint8_t local_gid[RMW_GID_STORAGE_SIZE]; entity_->copy_gid(local_gid); - z_owned_bytes_t attachment; - create_map_and_set_sequence_num(&attachment, sequence_number_++, local_gid); - - options.attachment = z_move(attachment); + zenoh::ZResult err; + auto options = zenoh::Publisher::PutOptions::create_default(); + options.attachment = create_map_and_set_sequence_num(sequence_number_++, local_gid); - z_owned_bytes_t payload; - z_bytes_copy_from_buf(&payload, serialized_message->buffer, data_length); + std::vector raw_image( + serialized_message->buffer, + serialized_message->buffer + data_length); + zenoh::Bytes payload(raw_image); - z_result_t res = z_publisher_put(z_loan(pub_), z_move(payload), &options); - if (res != Z_OK) { - if (res == Z_ESESSION_CLOSED) { + pub_.put(std::move(payload), std::move(options), &err); + if (err != Z_OK) { + if (err == Z_ESESSION_CLOSED) { RMW_ZENOH_LOG_WARN_NAMED( "rmw_zenoh_cpp", "unable to publish message since the zenoh session is closed"); @@ -369,7 +337,6 @@ rmw_ret_t PublisherData::publish_serialized_message( return RMW_RET_ERROR; } } - return RMW_RET_OK; } @@ -432,10 +399,14 @@ rmw_ret_t PublisherData::shutdown() "Unable to undeclare liveliness token"); return RMW_RET_ERROR; } - if (pub_cache_.has_value()) { - z_drop(z_move(pub_cache_.value())); + std::move(pub_).undeclare(&err); + if (err != Z_OK) + { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to undeclare publisher"); + return RMW_RET_ERROR; } - z_undeclare_publisher(z_move(pub_)); is_shutdown_ = true; return RMW_RET_OK; diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp index f8aeb0ad..bab55aae 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp @@ -83,8 +83,8 @@ class PublisherData final PublisherData( const rmw_node_t * rmw_node, std::shared_ptr entity, - z_owned_publisher_t pub, - std::optional pub_cache, + zenoh::Publisher pub, + std::optional pub_cache, zenoh::LivelinessToken token, const void * type_support_impl, std::unique_ptr type_support); @@ -96,9 +96,9 @@ class PublisherData final // The Entity generated for the publisher. std::shared_ptr entity_; // An owned publisher. - z_owned_publisher_t pub_; + zenoh::Publisher pub_; // Optional publication cache when durability is transient_local. - std::optional pub_cache_; + std::optional pub_cache_; // Liveliness token for the publisher. std::optional token_; // Type support fields diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp index f8f61a2e..6805b239 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp @@ -33,6 +33,18 @@ void create_map_and_set_sequence_num( data.serialize_to_zbytes(out_bytes); } +///============================================================================= +zenoh::Bytes create_map_and_set_sequence_num( + int64_t sequence_number, uint8_t gid[RMW_GID_STORAGE_SIZE]) +{ + auto now = std::chrono::system_clock::now().time_since_epoch(); + auto now_ns = std::chrono::duration_cast(now); + int64_t source_timestamp = now_ns.count(); + + rmw_zenoh_cpp::AttachementData data(sequence_number, source_timestamp, gid); + return std::move(data.serialize_to_zbytes()); +} + ///============================================================================= ZenohQuery::ZenohQuery(z_owned_query_t query) {query_ = query;} diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp index 97e1446f..ecc84859 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp @@ -15,7 +15,7 @@ #ifndef DETAIL__ZENOH_UTILS_HPP_ #define DETAIL__ZENOH_UTILS_HPP_ -#include +#include #include #include @@ -30,6 +30,10 @@ create_map_and_set_sequence_num( z_owned_bytes_t * out_bytes, int64_t sequence_number, uint8_t gid[RMW_GID_STORAGE_SIZE]); +///============================================================================= +zenoh::Bytes create_map_and_set_sequence_num( + int64_t sequence_number, uint8_t gid[RMW_GID_STORAGE_SIZE]); + ///============================================================================= // A class to store the replies to service requests. class ZenohReply final