Skip to content

Commit

Permalink
store adapted_qos_profiles in pub/sub/client/service
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Jan 18, 2024
1 parent 48bb3e5 commit 2099b8b
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 55 deletions.
4 changes: 2 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ void sub_data_handler(
{
std::lock_guard<std::mutex> lock(sub_data->message_queue_mutex);

if (sub_data->message_queue.size() >= sub_data->queue_depth) {
if (sub_data->message_queue.size() >= sub_data->adapted_qos_profile.depth) {
// Log warning if message is discarded due to hitting the queue depth
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"Message queue depth of %ld reached, discarding oldest message "
"for subscription for %s",
sub_data->queue_depth,
sub_data->adapted_qos_profile.depth,
z_loan(keystr));

std::unique_ptr<saved_msg_data> old = std::move(sub_data->message_queue.back());
Expand Down
17 changes: 14 additions & 3 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ struct rmw_publisher_data_t
// Optional publication cache when durability is transient_local.
ze_owned_publication_cache_t pub_cache;

// Store the actual QoS profile used to configure this publisher.
rmw_qos_profile_t adapted_qos_profile;

// Liveliness token for the publisher.
zc_owned_liveliness_token_t token;

Expand Down Expand Up @@ -117,6 +120,9 @@ struct rmw_subscription_data_t
// An owned subscriber or querying_subscriber depending on the QoS settings.
std::variant<z_owned_subscriber_t, ze_owned_querying_subscriber_t> sub;

// Store the actual QoS profile used to configure this subscription.
rmw_qos_profile_t adapted_qos_profile;

// Liveliness token for the subscription.
zc_owned_liveliness_token_t token;

Expand All @@ -128,9 +134,6 @@ struct rmw_subscription_data_t
std::deque<std::unique_ptr<saved_msg_data>> message_queue;
std::mutex message_queue_mutex;

size_t queue_depth;
bool reliable;

std::mutex internal_mutex;
std::condition_variable * condition{nullptr};
};
Expand All @@ -151,6 +154,10 @@ struct rmw_service_data_t
z_owned_keyexpr_t keyexpr;
z_owned_queryable_t qable;

// Store the actual QoS profile used to configure this service.
// The QoS is reused for getting requests and sending responses.
rmw_qos_profile_t adapted_qos_profile;

// Liveliness token for the service.
zc_owned_liveliness_token_t token;

Expand Down Expand Up @@ -181,6 +188,10 @@ struct rmw_client_data_t
z_owned_keyexpr_t keyexpr;
z_owned_closure_reply_t zn_closure_reply;

// Store the actual QoS profile used to configure this client.
// The QoS is reused for sending requests and getting responses.
rmw_qos_profile_t adapted_qos_profile;

// Liveliness token for the client.
zc_owned_liveliness_token_t token;

Expand Down
136 changes: 86 additions & 50 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,13 +461,6 @@ rmw_create_publisher(
"[rmw_create_publisher] %s",
topic_name);

// Adapt any 'best available' QoS options
rmw_qos_profile_t adapted_qos_profile = *qos_profile;
rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_publisher(
node, topic_name, &adapted_qos_profile, rmw_get_subscriptions_info_by_topic);
if (RMW_RET_OK != ret) {
return nullptr;
}
RMW_CHECK_ARGUMENT_FOR_NULL(publisher_options, nullptr);
if (publisher_options->require_unique_network_flow_endpoints ==
RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_STRICTLY_REQUIRED)
Expand Down Expand Up @@ -529,6 +522,21 @@ rmw_create_publisher(
allocator->deallocate(publisher_data, allocator->state);
});

RMW_TRY_PLACEMENT_NEW(publisher_data, publisher_data, return nullptr, rmw_publisher_data_t);
auto destruct_pub_data = rcpputils::make_scope_exit(
[publisher_data]() {
RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE(
publisher_data->~rmw_publisher_data_t(),
rmw_publisher_data_t);
});

// Adapt any 'best available' QoS options
publisher_data->adapted_qos_profile = *qos_profile;
rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_publisher(
node, topic_name, &publisher_data->adapted_qos_profile, rmw_get_subscriptions_info_by_topic);
if (RMW_RET_OK != ret) {
return nullptr;
}
publisher_data->typesupport_identifier = type_support->typesupport_identifier;
publisher_data->type_support_impl = type_support->data;
auto callbacks = static_cast<const message_type_support_callbacks_t *>(type_support->data);
Expand Down Expand Up @@ -590,9 +598,9 @@ rmw_create_publisher(
}

// Create a Publication Cache if durability is transient_local.
if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) {
if (publisher_data->adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) {
ze_publication_cache_options_t pub_cache_opts = ze_publication_cache_options_default();
pub_cache_opts.history = adapted_qos_profile.depth;
pub_cache_opts.history = publisher_data->adapted_qos_profile.depth;
publisher_data->pub_cache = ze_declare_publication_cache(
z_loan(context_impl->session),
z_loan(keyexpr),
Expand All @@ -612,8 +620,8 @@ rmw_create_publisher(

// Set congestion_control to BLOCK if appropriate.
z_publisher_options_t opts = z_publisher_options_default();
if (adapted_qos_profile.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL &&
adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE)
if (publisher_data->adapted_qos_profile.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL &&
publisher_data->adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE)
{
opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK;
} else {
Expand Down Expand Up @@ -656,7 +664,7 @@ rmw_create_publisher(
liveliness::EntityType::Publisher,
liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""},
liveliness::TopicInfo{rmw_publisher->topic_name,
publisher_data->type_support->get_name(), adapted_qos_profile}
publisher_data->type_support->get_name(), publisher_data->adapted_qos_profile}
);
if (!liveliness_entity.has_value()) {
RCUTILS_LOG_ERROR_NAMED(
Expand Down Expand Up @@ -686,6 +694,7 @@ rmw_create_publisher(
undeclare_z_publisher_cache.cancel();
undeclare_z_publisher.cancel();
free_topic_name.cancel();
destruct_pub_data.cancel();
destruct_msg_type_support.cancel();
free_type_support.cancel();
free_publisher_data.cancel();
Expand Down Expand Up @@ -747,6 +756,7 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher)
RMW_SET_ERROR_MSG("failed to undeclare pub");
ret = RMW_RET_ERROR;
}
RMW_TRY_DESTRUCTOR(publisher_data->~rmw_publisher_data_t(), rmw_publisher_data_t, );
allocator->deallocate(publisher_data, allocator->state);
}
allocator->deallocate(const_cast<char *>(publisher->topic_name), allocator->state);
Expand Down Expand Up @@ -984,9 +994,18 @@ rmw_publisher_get_actual_qos(
const rmw_publisher_t * publisher,
rmw_qos_profile_t * qos)
{
static_cast<void>(publisher);
static_cast<void>(qos);
// TODO(yadunund): Fixme.
RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
publisher,
publisher->implementation_identifier,
rmw_zenoh_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT);

rmw_publisher_data_t * pub_data = static_cast<rmw_publisher_data_t *>(publisher->data);
RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT);

*qos = pub_data->adapted_qos_profile;
return RMW_RET_OK;
}

Expand Down Expand Up @@ -1188,14 +1207,6 @@ rmw_create_subscription(
return nullptr;
}
RMW_CHECK_ARGUMENT_FOR_NULL(qos_profile, nullptr);
// Adapt any 'best available' QoS options
rmw_qos_profile_t adapted_qos_profile = *qos_profile;
rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_subscription(
node, topic_name, &adapted_qos_profile, rmw_get_publishers_info_by_topic);
if (RMW_RET_OK != ret) {
RMW_SET_ERROR_MSG("Failed to obtain adapted_qos_profile.");
return nullptr;
}
RMW_CHECK_ARGUMENT_FOR_NULL(subscription_options, nullptr);

RCUTILS_LOG_INFO_NAMED(
Expand Down Expand Up @@ -1268,6 +1279,15 @@ rmw_create_subscription(
rmw_subscription_data_t);
});

// Adapt any 'best available' QoS options
sub_data->adapted_qos_profile = *qos_profile;
rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_subscription(
node, topic_name, &sub_data->adapted_qos_profile, rmw_get_publishers_info_by_topic);
if (RMW_RET_OK != ret) {
RMW_SET_ERROR_MSG("Failed to obtain adapted_qos_profile.");
return nullptr;
}

sub_data->typesupport_identifier = type_support->typesupport_identifier;
sub_data->type_support_impl = type_support->data;
auto callbacks = static_cast<const message_type_support_callbacks_t *>(type_support->data);
Expand All @@ -1294,7 +1314,6 @@ rmw_create_subscription(
MessageTypeSupport);
});

sub_data->queue_depth = qos_profile->depth;
sub_data->context = node->context;

rmw_subscription->implementation_identifier = rmw_zenoh_identifier;
Expand Down Expand Up @@ -1344,12 +1363,10 @@ rmw_create_subscription(
z_drop(z_move(owned_key_str));
});

sub_data->reliable = false;
if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) {
if (sub_data->adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) {
ze_querying_subscriber_options_t sub_options = ze_querying_subscriber_options_default();
if (adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) {
if (sub_data->adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) {
sub_options.reliability = Z_RELIABILITY_RELIABLE;
sub_data->reliable = true;
}
sub_data->sub = ze_declare_querying_subscriber(
z_loan(context_impl->session),
Expand All @@ -1368,7 +1385,6 @@ rmw_create_subscription(
z_subscriber_options_t sub_options = z_subscriber_options_default();
if (qos_profile->reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) {
sub_options.reliability = Z_RELIABILITY_RELIABLE;
sub_data->reliable = true;
}
sub_data->sub = z_declare_subscriber(
z_loan(context_impl->session),
Expand Down Expand Up @@ -1403,7 +1419,7 @@ rmw_create_subscription(
liveliness::EntityType::Subscription,
liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""},
liveliness::TopicInfo{rmw_subscription->topic_name,
sub_data->type_support->get_name(), adapted_qos_profile}
sub_data->type_support->get_name(), sub_data->adapted_qos_profile}
);
if (!liveliness_entity.has_value()) {
RCUTILS_LOG_ERROR_NAMED(
Expand Down Expand Up @@ -1528,10 +1544,7 @@ rmw_subscription_get_actual_qos(
auto sub_data = static_cast<rmw_subscription_data_t *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT);

qos->reliability = sub_data->reliable ? RMW_QOS_POLICY_RELIABILITY_RELIABLE :
RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT;
qos->durability = std::holds_alternative<ze_owned_querying_subscriber_t>(sub_data->sub) ?
RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL : RMW_QOS_POLICY_DURABILITY_VOLATILE;
*qos = sub_data->adapted_qos_profile;
return RMW_RET_OK;
}

Expand Down Expand Up @@ -1863,6 +1876,10 @@ rmw_create_client(
rmw_client_data_t);
});

// Adapt any 'best available' QoS options
client_data->adapted_qos_profile =
rmw_dds_common::qos_profile_update_best_available_for_services(*qos_profile);

// Obtain the type support
const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports);
if (type_support == nullptr) {
Expand Down Expand Up @@ -1973,7 +1990,7 @@ rmw_create_client(
liveliness::EntityType::Client,
liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""},
liveliness::TopicInfo{rmw_client->service_name,
std::move(service_type), *qos_profile}
std::move(service_type), client_data->adapted_qos_profile}
);
if (!liveliness_entity.has_value()) {
RCUTILS_LOG_ERROR_NAMED(
Expand Down Expand Up @@ -2320,9 +2337,18 @@ rmw_client_request_publisher_get_actual_qos(
const rmw_client_t * client,
rmw_qos_profile_t * qos)
{
// TODO(francocipollone): Fix.
static_cast<void>(client);
static_cast<void>(qos);
RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
client,
client->implementation_identifier,
rmw_zenoh_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT);

rmw_client_data_t * client_data = static_cast<rmw_client_data_t *>(client->data);
RMW_CHECK_ARGUMENT_FOR_NULL(client_data, RMW_RET_INVALID_ARGUMENT);

*qos = client_data->adapted_qos_profile;
return RMW_RET_OK;
}

Expand All @@ -2333,10 +2359,8 @@ rmw_client_response_subscription_get_actual_qos(
const rmw_client_t * client,
rmw_qos_profile_t * qos)
{
// TODO(francocipollone): Fix.
static_cast<void>(client);
static_cast<void>(qos);
return RMW_RET_OK;
// The same QoS profile is used for sending requests and receiving responses.
return rmw_client_request_publisher_get_actual_qos(client, qos);
}

//==============================================================================
Expand Down Expand Up @@ -2435,6 +2459,11 @@ rmw_create_service(
rmw_service_data_t);
});

// Adapt any 'best available' QoS options
service_data->adapted_qos_profile =
rmw_dds_common::qos_profile_update_best_available_for_services(*qos_profiles);


// Get the RMW type support.
const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports);
if (type_support == nullptr) {
Expand Down Expand Up @@ -2561,7 +2590,7 @@ rmw_create_service(
liveliness::EntityType::Service,
liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""},
liveliness::TopicInfo{rmw_service->service_name,
std::move(service_type), *qos_profiles}
std::move(service_type), service_data->adapted_qos_profile}
);
if (!liveliness_entity.has_value()) {
RCUTILS_LOG_ERROR_NAMED(
Expand Down Expand Up @@ -2852,9 +2881,18 @@ rmw_service_request_subscription_get_actual_qos(
const rmw_service_t * service,
rmw_qos_profile_t * qos)
{
// TODO(yadunund): Fix.
static_cast<void>(service);
static_cast<void>(qos);
RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
service,
service->implementation_identifier,
rmw_zenoh_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT);

rmw_service_data_t * service_data = static_cast<rmw_service_data_t *>(service->data);
RMW_CHECK_ARGUMENT_FOR_NULL(service_data, RMW_RET_INVALID_ARGUMENT);

*qos = service_data->adapted_qos_profile;
return RMW_RET_OK;
}

Expand All @@ -2865,10 +2903,8 @@ rmw_service_response_publisher_get_actual_qos(
const rmw_service_t * service,
rmw_qos_profile_t * qos)
{
// TODO(yadunund): Fix.
static_cast<void>(service);
static_cast<void>(qos);
return RMW_RET_OK;
// The same QoS profile is used for receiving requests and sending responses.
return rmw_service_request_subscription_get_actual_qos(service, qos);
}

//==============================================================================
Expand Down

0 comments on commit 2099b8b

Please sign in to comment.