Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initialize PublsiherData only after all members are valid #289

Merged
merged 2 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_node_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,13 @@ rmw_ret_t NodeData::shutdown()
for (auto pub_it = pubs_.begin(); pub_it != pubs_.end(); ++pub_it) {
ret = pub_it->second->shutdown();
if (ret != RMW_RET_OK) {
return ret;
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to shutdown publisher %s within id %zu. rmw_ret_t code: %zu.",
pub_it->second->topic_info().name_.c_str(),
id_,
ret
);
}
}

Expand Down
82 changes: 49 additions & 33 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ std::shared_ptr<PublisherData> PublisherData::make(
const rosidl_message_type_support_t * type_support,
const rmw_qos_profile_t * qos_profile)
{
auto pub_data = std::shared_ptr<PublisherData>(new PublisherData{});
pub_data->rmw_node_ = node;
generate_random_gid(pub_data->gid_);
rmw_qos_profile_t adapted_qos_profile = *qos_profile;
rmw_ret_t ret = QoS::get().best_available_qos(
node, topic_name.c_str(), &adapted_qos_profile, rmw_get_subscriptions_info_by_topic);
Expand All @@ -61,9 +58,8 @@ std::shared_ptr<PublisherData> PublisherData::make(
rcutils_allocator_t * allocator = &node->context->options.allocator;

const rosidl_type_hash_t * type_hash = type_support->get_type_hash_func(type_support);
pub_data->type_support_impl_ = type_support->data;
auto callbacks = static_cast<const message_type_support_callbacks_t *>(type_support->data);
pub_data->type_support_ = std::make_unique<MessageTypeSupport>(callbacks);
auto message_type_support = std::make_unique<MessageTypeSupport>(callbacks);

// Convert the type hash to a string so that it can be included in
// the keyexpr.
Expand All @@ -82,7 +78,7 @@ std::shared_ptr<PublisherData> PublisherData::make(
});

std::size_t domain_id = node_info.domain_id_;
pub_data->entity_ = liveliness::Entity::make(
auto entity = liveliness::Entity::make(
z_info_zid(session),
std::to_string(node_id),
std::to_string(publisher_id),
Expand All @@ -91,19 +87,19 @@ std::shared_ptr<PublisherData> PublisherData::make(
liveliness::TopicInfo{
std::move(domain_id),
topic_name,
pub_data->type_support_->get_name(),
message_type_support->get_name(),
type_hash_c_str,
adapted_qos_profile}
);
if (pub_data->entity_ == nullptr) {
if (entity == nullptr) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to generate keyexpr for liveliness token for the publisher %s.",
topic_name.c_str());
return nullptr;
}
z_owned_keyexpr_t keyexpr = z_keyexpr_new(
pub_data->entity_->topic_info()->topic_keyexpr_.c_str());
entity->topic_info()->topic_keyexpr_.c_str());
auto always_free_ros_keyexpr = rcpputils::make_scope_exit(
[&keyexpr]() {
z_keyexpr_drop(z_move(keyexpr));
Expand All @@ -114,6 +110,7 @@ std::shared_ptr<PublisherData> PublisherData::make(
}

// Create a Publication Cache if durability is transient_local.
std::optional<ze_owned_publication_cache_t> pub_cache = std::nullopt;
if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) {
ze_publication_cache_options_t pub_cache_opts = ze_publication_cache_options_default();
pub_cache_opts.history = adapted_qos_profile.depth;
Expand All @@ -124,26 +121,26 @@ std::shared_ptr<PublisherData> PublisherData::make(
// When such a prefix is added to the PublicationCache, it listens to queries with this extra
// prefix (allowing to be queried in a unique way), but still replies with the original
// publications' key expressions.
z_owned_keyexpr_t queryable_prefix = z_keyexpr_new(pub_data->entity_->zid().c_str());
z_owned_keyexpr_t queryable_prefix = z_keyexpr_new(entity->zid().c_str());
auto always_free_queryable_prefix = rcpputils::make_scope_exit(
[&queryable_prefix]() {
z_keyexpr_drop(z_move(queryable_prefix));
});
pub_cache_opts.queryable_prefix = z_loan(queryable_prefix);
pub_data->pub_cache_ = ze_declare_publication_cache(
pub_cache = ze_declare_publication_cache(
session,
z_loan(keyexpr),
&pub_cache_opts
);
if (!pub_data->pub_cache_.has_value() || !z_check(pub_data->pub_cache_.value())) {
if (!pub_cache.has_value() || !z_check(pub_cache.value())) {
RMW_SET_ERROR_MSG("unable to create zenoh publisher cache");
return nullptr;
}
}
auto undeclare_z_publisher_cache = rcpputils::make_scope_exit(
[pub_data]() {
if (pub_data && pub_data->pub_cache_.has_value()) {
z_drop(z_move(pub_data->pub_cache_.value()));
[&pub_cache]() {
if (pub_cache.has_value()) {
z_drop(z_move(pub_cache.value()));
}
});

Expand All @@ -156,54 +153,73 @@ std::shared_ptr<PublisherData> PublisherData::make(
opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK;
}
// TODO(clalancette): What happens if the key name is a valid but empty string?
pub_data->pub_ = z_declare_publisher(
z_owned_publisher_t pub = z_declare_publisher(
session,
z_loan(keyexpr),
&opts
);
if (!z_check(pub_data->pub_)) {
if (!z_check(pub)) {
RMW_SET_ERROR_MSG("Unable to create Zenoh publisher.");
return nullptr;
}
auto undeclare_z_publisher = rcpputils::make_scope_exit(
[pub_data]() {
z_undeclare_publisher(z_move(pub_data->pub_));
[&pub]() {
z_undeclare_publisher(z_move(pub));
});

pub_data->token_ = zc_liveliness_declare_token(
zc_owned_liveliness_token_t token = zc_liveliness_declare_token(
session,
z_keyexpr(pub_data->entity_->liveliness_keyexpr().c_str()),
z_keyexpr(entity->liveliness_keyexpr().c_str()),
NULL
);
auto free_token = rcpputils::make_scope_exit(
[pub_data]() {
if (pub_data != nullptr) {
z_drop(z_move(pub_data->token_));
}
[&token]() {
z_drop(z_move(token));
});
if (!z_check(pub_data->token_)) {
if (!z_check(token)) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to create liveliness token for the publisher.");
return nullptr;
}

// Initialize the events manager.
pub_data->events_mgr_ = std::make_shared<EventsManager>();

free_token.cancel();
undeclare_z_publisher_cache.cancel();
undeclare_z_publisher.cancel();

return pub_data;
return std::shared_ptr<PublisherData>(
new PublisherData{
node,
std::move(entity),
std::move(pub),
std::move(pub_cache),
std::move(token),
type_support->data,
std::move(message_type_support)
});
}

///=============================================================================
PublisherData::PublisherData()
: sequence_number_(1),
PublisherData::PublisherData(
const rmw_node_t * rmw_node,
std::shared_ptr<liveliness::Entity> entity,
z_owned_publisher_t pub,
std::optional<ze_owned_publication_cache_t> pub_cache,
zc_owned_liveliness_token_t token,
const void * type_support_impl,
std::unique_ptr<MessageTypeSupport> type_support)
: rmw_node_(rmw_node),
entity_(std::move(entity)),
pub_(std::move(pub)),
pub_cache_(std::move(pub_cache)),
token_(std::move(token)),
type_support_impl_(type_support_impl),
type_support_(std::move(type_support)),
sequence_number_(1),
is_shutdown_(false)
{
// Do nothing.
generate_random_gid(gid_);
events_mgr_ = std::make_shared<EventsManager>();
}

///=============================================================================
Expand Down
9 changes: 8 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,14 @@ class PublisherData final

private:
// Constructor.
PublisherData();
PublisherData(
const rmw_node_t * rmw_node,
std::shared_ptr<liveliness::Entity> entity,
z_owned_publisher_t pub,
std::optional<ze_owned_publication_cache_t> pub_cache,
zc_owned_liveliness_token_t token,
const void * type_support_impl,
std::unique_ptr<MessageTypeSupport> type_support);

// Internal mutex.
mutable std::mutex mutex_;
Expand Down