From e1d90d72750347b2740eb3e2d5faac2c06f86705 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 11 Jan 2024 23:58:53 +0800 Subject: [PATCH] Initialize pubcache only if durability is transient local Signed-off-by: Yadunund --- .../DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 | 2 +- .../DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 | 2 +- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 3 +++ rmw_zenoh_cpp/src/rmw_zenoh.cpp | 25 +++++++++++++++++++ 4 files changed, 30 insertions(+), 2 deletions(-) diff --git a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 index 434de9fe..31803a90 100644 --- a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 +++ b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 @@ -69,7 +69,7 @@ timestamping: { /// Whether data messages should be timestamped if not already. /// Accepts a single boolean value or different values for router, peer and client. - enabled: { router: true, peer: false, client: false }, + enabled: { router: true, peer: true, client: false }, /// Whether data messages with timestamps in the future should be dropped or not. /// If set to false (default), messages with timestamps in the future are retimestamped. /// Timestamps are ignored if timestamping is disabled. diff --git a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 index 43550e1d..ea13cc64 100644 --- a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 +++ b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 @@ -69,7 +69,7 @@ timestamping: { /// Whether data messages should be timestamped if not already. /// Accepts a single boolean value or different values for router, peer and client. - enabled: { router: true, peer: false, client: false }, + enabled: { router: true, peer: true, client: false }, /// Whether data messages with timestamps in the future should be dropped or not. /// If set to false (default), messages with timestamps in the future are retimestamped. /// Timestamps are ignored if timestamping is disabled. diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 3df33b9d..e9e6f471 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -74,6 +74,9 @@ struct rmw_publisher_data_t // An owned publisher. z_owned_publisher_t pub; + // Optional publication cache when durability is transient_local. + ze_owned_publication_cache_t pub_cache; + // Liveliness token for the publisher. zc_owned_liveliness_token_t token; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index c1c03041..9a3854d6 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -584,6 +584,29 @@ rmw_create_publisher( RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); return nullptr; } + + // Create a Publication Cache if durability is transient_local. + publisher_data->pub_cache = ze_publication_cache_null(); + if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { + ze_publication_cache_options_t pub_cache_opts = ze_publication_cache_options_default(); + pub_cache_opts.history = adapted_qos_profile.depth; + publisher_data->pub_cache = ze_declare_publication_cache( + z_loan(context_impl->session), + z_loan(keyexpr), + &pub_cache_opts + ); + if (!z_check(publisher_data->pub_cache)) { + RMW_SET_ERROR_MSG("unable to create zenoh publisher cache"); + return nullptr; + } + } + auto undeclare_z_publisher_cache = rcpputils::make_scope_exit( + [publisher_data]() { + if (publisher_data) { + z_drop(z_move(publisher_data->pub_cache)); + } + }); + // TODO(clalancette): What happens if the key name is a valid but empty string? publisher_data->pub = z_declare_publisher( z_loan(context_impl->session), @@ -648,6 +671,7 @@ rmw_create_publisher( } free_token.cancel(); + undeclare_z_publisher_cache.cancel(); undeclare_z_publisher.cancel(); free_topic_name.cancel(); destruct_msg_type_support.cancel(); @@ -702,6 +726,7 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) // return RMW_RET_ERROR; // } z_drop(z_move(publisher_data->token)); + z_drop(z_move(publisher_data->pub_cache)); RMW_TRY_DESTRUCTOR(publisher_data->type_support->~MessageTypeSupport(), MessageTypeSupport, ); allocator->deallocate(publisher_data->type_support, allocator->state);