From ac6c8d6f48d33a23a34a0e2d55a86e2fc6a303fe Mon Sep 17 00:00:00 2001 From: Yadunund Date: Fri, 12 Jan 2024 00:17:30 +0800 Subject: [PATCH] Set publisher congestion when appropriate Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/liveliness_utils.cpp | 12 +++++++----- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 11 ++++++++++- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index 66aab243..00cb272c 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -68,6 +68,7 @@ static const char SRV_STR[] = "SS"; static const char CLI_STR[] = "SC"; static const char SLASH_REPLACEMENT = '%'; static const char QOS_DELIMITER = ':'; +static const char QOS_HISTORY_DELIMITER = ','; static const std::unordered_map entity_to_str = { {EntityType::Node, NODE_STR}, @@ -140,7 +141,7 @@ std::vector split_keyexpr( * Where: * - "reliable" or "best_effort". * - "volatile" or "transient_local". - * - "keep_last". + * - "keep_all" or "keep_last". * - The depth number. */ // TODO(Yadunund): Rely on maps to retrieve strings. @@ -152,8 +153,8 @@ std::string qos_to_keyexpr(rmw_qos_profile_t qos) keyexpr += qos.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL ? "transient_local" : "volatile"; keyexpr += QOS_DELIMITER; - // TODO(Yadunund): Update once we properly support History. - keyexpr += "keep_last,"; + keyexpr += qos.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL ? "keep_all" : "keep_last"; + keyexpr += QOS_HISTORY_DELIMITER; keyexpr += std::to_string(qos.depth); return keyexpr; } @@ -171,11 +172,12 @@ std::optional keyexpr_to_qos(const std::string & keyexpr) qos.durability = parts[1] == "transient_local" ? RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL : RMW_QOS_POLICY_DURABILITY_VOLATILE; - const std::vector history_parts = split_keyexpr(parts[2], ','); + const std::vector history_parts = split_keyexpr(parts[2], QOS_HISTORY_DELIMITER); if (history_parts.size() < 2) { return std::nullopt; } - qos.history = RMW_QOS_POLICY_HISTORY_KEEP_LAST; + qos.history = history_parts[0] == + "keep_all" ? RMW_QOS_POLICY_HISTORY_KEEP_ALL : RMW_QOS_POLICY_HISTORY_KEEP_LAST; sscanf(history_parts[1].c_str(), "%zu", &qos.depth); return qos; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 9a3854d6..e70321a9 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -607,11 +607,20 @@ rmw_create_publisher( } }); + // Set congestion_control to BLOCK if appropriate. + z_publisher_options_t opts = z_publisher_options_default(); + if (adapted_qos_profile.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL && + adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) + { + opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK; + } else { + opts.congestion_control = Z_CONGESTION_CONTROL_DROP; + } // TODO(clalancette): What happens if the key name is a valid but empty string? publisher_data->pub = z_declare_publisher( z_loan(context_impl->session), z_loan(keyexpr), - NULL + &opts ); if (!z_check(publisher_data->pub)) { RMW_SET_ERROR_MSG("unable to create zenoh publisher");