Skip to content

Commit

Permalink
Set publisher congestion when appropriate
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Jan 15, 2024
1 parent e1d90d7 commit ac6c8d6
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 6 deletions.
12 changes: 7 additions & 5 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ static const char SRV_STR[] = "SS";
static const char CLI_STR[] = "SC";
static const char SLASH_REPLACEMENT = '%';
static const char QOS_DELIMITER = ':';
static const char QOS_HISTORY_DELIMITER = ',';

static const std::unordered_map<EntityType, std::string> entity_to_str = {
{EntityType::Node, NODE_STR},
Expand Down Expand Up @@ -140,7 +141,7 @@ std::vector<std::string> split_keyexpr(
* Where:
* <ReliabilityKind> - "reliable" or "best_effort".
* <DurabilityKind> - "volatile" or "transient_local".
* <HistoryKind> - "keep_last".
* <HistoryKind> - "keep_all" or "keep_last".
* <HistoryDepth> - The depth number.
*/
// TODO(Yadunund): Rely on maps to retrieve strings.
Expand All @@ -152,8 +153,8 @@ std::string qos_to_keyexpr(rmw_qos_profile_t qos)
keyexpr += qos.durability ==
RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL ? "transient_local" : "volatile";
keyexpr += QOS_DELIMITER;
// TODO(Yadunund): Update once we properly support History.
keyexpr += "keep_last,";
keyexpr += qos.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL ? "keep_all" : "keep_last";
keyexpr += QOS_HISTORY_DELIMITER;
keyexpr += std::to_string(qos.depth);
return keyexpr;
}
Expand All @@ -171,11 +172,12 @@ std::optional<rmw_qos_profile_t> keyexpr_to_qos(const std::string & keyexpr)
qos.durability = parts[1] ==
"transient_local" ? RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL :
RMW_QOS_POLICY_DURABILITY_VOLATILE;
const std::vector<std::string> history_parts = split_keyexpr(parts[2], ',');
const std::vector<std::string> history_parts = split_keyexpr(parts[2], QOS_HISTORY_DELIMITER);
if (history_parts.size() < 2) {
return std::nullopt;
}
qos.history = RMW_QOS_POLICY_HISTORY_KEEP_LAST;
qos.history = history_parts[0] ==
"keep_all" ? RMW_QOS_POLICY_HISTORY_KEEP_ALL : RMW_QOS_POLICY_HISTORY_KEEP_LAST;
sscanf(history_parts[1].c_str(), "%zu", &qos.depth);

return qos;
Expand Down
11 changes: 10 additions & 1 deletion rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -607,11 +607,20 @@ rmw_create_publisher(
}
});

// Set congestion_control to BLOCK if appropriate.
z_publisher_options_t opts = z_publisher_options_default();
if (adapted_qos_profile.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL &&
adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE)
{
opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK;
} else {
opts.congestion_control = Z_CONGESTION_CONTROL_DROP;
}
// TODO(clalancette): What happens if the key name is a valid but empty string?
publisher_data->pub = z_declare_publisher(
z_loan(context_impl->session),
z_loan(keyexpr),
NULL
&opts
);
if (!z_check(publisher_data->pub)) {
RMW_SET_ERROR_MSG("unable to create zenoh publisher");
Expand Down

0 comments on commit ac6c8d6

Please sign in to comment.