diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index 66aab243..ea1be9f0 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -68,6 +68,7 @@ static const char SRV_STR[] = "SS"; static const char CLI_STR[] = "SC"; static const char SLASH_REPLACEMENT = '%'; static const char QOS_DELIMITER = ':'; +static const char QOS_HISTORY_DELIMITER = ','; static const std::unordered_map entity_to_str = { {EntityType::Node, NODE_STR}, @@ -140,7 +141,7 @@ std::vector split_keyexpr( * Where: * - "reliable" or "best_effort". * - "volatile" or "transient_local". - * - "keep_last". + * - "keep_all" or "keep_last". * - The depth number. */ // TODO(Yadunund): Rely on maps to retrieve strings. @@ -152,8 +153,8 @@ std::string qos_to_keyexpr(rmw_qos_profile_t qos) keyexpr += qos.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL ? "transient_local" : "volatile"; keyexpr += QOS_DELIMITER; - // TODO(Yadunund): Update once we properly support History. - keyexpr += "keep_last,"; + keyexpr += qos.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL ? "keep_all" : "keep_last"; + keyexpr += QOS_HISTORY_DELIMITER; keyexpr += std::to_string(qos.depth); return keyexpr; } @@ -171,7 +172,7 @@ std::optional keyexpr_to_qos(const std::string & keyexpr) qos.durability = parts[1] == "transient_local" ? RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL : RMW_QOS_POLICY_DURABILITY_VOLATILE; - const std::vector history_parts = split_keyexpr(parts[2], ','); + const std::vector history_parts = split_keyexpr(parts[2], QOS_HISTORY_DELIMITER); if (history_parts.size() < 2) { return std::nullopt; } diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 50f08fa6..fd041a27 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -606,11 +606,20 @@ rmw_create_publisher( } }); + // Set congestion_control to BLOCK if appropriate. + z_publisher_options_t opts = z_publisher_options_default(); + if (adapted_qos_profile.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL && + adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) + { + opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK; + } else { + opts.congestion_control = Z_CONGESTION_CONTROL_DROP; + } // TODO(clalancette): What happens if the key name is a valid but empty string? publisher_data->pub = z_declare_publisher( z_loan(context_impl->session), z_loan(keyexpr), - NULL + &opts ); if (!z_check(publisher_data->pub)) { RMW_SET_ERROR_MSG("unable to create zenoh publisher");