Skip to content

Commit

Permalink
Make transient_local work
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Jan 18, 2024
1 parent 9d49ef3 commit b7a27be
Showing 1 changed file with 32 additions and 8 deletions.
40 changes: 32 additions & 8 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,12 @@ rmw_create_publisher(
return nullptr;
}
}

RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"[rmw_create_publisher] %s",
topic_name);

// Adapt any 'best available' QoS options
rmw_qos_profile_t adapted_qos_profile = *qos_profile;
rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_publisher(
Expand Down Expand Up @@ -584,7 +590,6 @@ rmw_create_publisher(
}

// Create a Publication Cache if durability is transient_local.
publisher_data->pub_cache = ze_publication_cache_null();
if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) {
ze_publication_cache_options_t pub_cache_opts = ze_publication_cache_options_default();
pub_cache_opts.history = adapted_qos_profile.depth;
Expand Down Expand Up @@ -733,6 +738,7 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher)
// return RMW_RET_ERROR;
// }
z_drop(z_move(publisher_data->token));
publisher_data->pub_cache = ze_publication_cache_null();
z_drop(z_move(publisher_data->pub_cache));

RMW_TRY_DESTRUCTOR(publisher_data->type_support->~MessageTypeSupport(), MessageTypeSupport, );
Expand Down Expand Up @@ -1192,6 +1198,11 @@ rmw_create_subscription(
}
RMW_CHECK_ARGUMENT_FOR_NULL(subscription_options, nullptr);

RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"[rmw_create_subscription] %s",
topic_name);

const rosidl_message_type_support_t * type_support = find_message_type_support(type_supports);
if (type_support == nullptr) {
// error was already set by find_message_type_support
Expand Down Expand Up @@ -1327,13 +1338,18 @@ rmw_create_subscription(
// adapted_qos_profile.
// TODO(Yadunund): Rely on a separate function to return the sub
// as we start supporting more qos settings.
z_owned_str_t owned_key_str = z_keyexpr_to_string(z_loan(keyexpr));
auto always_drop_keystr = rcpputils::make_scope_exit(
[&owned_key_str]() {
z_drop(z_move(owned_key_str));
});

sub_data->reliable = false;
if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) {
ze_querying_subscriber_options_t sub_options = ze_querying_subscriber_options_default();
if (adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) {
sub_options.reliability = Z_RELIABILITY_RELIABLE;
sub_data->reliable = true;
sub_options.query_target = Z_QUERY_TARGET_ALL_COMPLETE;
}
sub_data->sub = ze_declare_querying_subscriber(
z_loan(context_impl->session),
Expand All @@ -1345,6 +1361,7 @@ rmw_create_subscription(
RMW_SET_ERROR_MSG("unable to create zenoh subscription");
return nullptr;
}
printf("Created querying sub %s\n", owned_key_str._cstr);
}
// Create a regular subscriber for all other durability settings.
else {
Expand All @@ -1363,14 +1380,21 @@ rmw_create_subscription(
RMW_SET_ERROR_MSG("unable to create zenoh subscription");
return nullptr;
}
printf("Created regular sub %s\n", owned_key_str._cstr);
}

auto undeclare_z_sub = rcpputils::make_scope_exit(
[sub_data]() {
// TODO(Yadunund): Check if this is okay or if it is better
// to cast into explicit types and call appropriate undeclare method.
// See rmw_destroy_subscription()
z_drop(z_move(sub_data->sub));
z_owned_subscriber_t * sub = std::get_if<z_owned_subscriber_t>(&sub_data->sub);
if (sub == nullptr || z_undeclare_subscriber(sub)) {
RMW_SET_ERROR_MSG("failed to undeclare sub");
} else {
ze_owned_querying_subscriber_t * querying_sub =
std::get_if<ze_owned_querying_subscriber_t>(&sub_data->sub);
if (querying_sub == nullptr || ze_undeclare_querying_subscriber(querying_sub)) {
RMW_SET_ERROR_MSG("failed to undeclare sub");
}
}
});

// Publish to the graph that a new subscription is in town
Expand Down Expand Up @@ -3028,7 +3052,7 @@ rmw_wait(
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);

// TODO(yadunund): Switch to debug log level.
RCUTILS_LOG_WARN_NAMED(
RCUTILS_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"[rmw_wait] %ld subscriptions, %ld services, %ld clients, %ld events, %ld guard conditions",
subscriptions->subscriber_count,
Expand All @@ -3039,7 +3063,7 @@ rmw_wait(

// TODO(yadunund): Switch to debug log level.
if (wait_timeout) {
RCUTILS_LOG_WARN_NAMED(
RCUTILS_LOG_DEBUG_NAMED(
"rmw_zenoh_common_cpp", "[rmw_wait] TIMEOUT: %ld s %ld ns",
wait_timeout->sec,
wait_timeout->nsec);
Expand Down

0 comments on commit b7a27be

Please sign in to comment.