Skip to content

Commit

Permalink
Initialize PublsiherData only after all members are valid (#289)
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund authored Oct 3, 2024
1 parent cf8e8a2 commit 8ba1c91
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 35 deletions.
8 changes: 7 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_node_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,13 @@ rmw_ret_t NodeData::shutdown()
for (auto pub_it = pubs_.begin(); pub_it != pubs_.end(); ++pub_it) {
ret = pub_it->second->shutdown();
if (ret != RMW_RET_OK) {
return ret;
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to shutdown publisher %s within id %zu. rmw_ret_t code: %zu.",
pub_it->second->topic_info().name_.c_str(),
id_,
ret
);
}
}

Expand Down
82 changes: 49 additions & 33 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ std::shared_ptr<PublisherData> PublisherData::make(
const rosidl_message_type_support_t * type_support,
const rmw_qos_profile_t * qos_profile)
{
auto pub_data = std::shared_ptr<PublisherData>(new PublisherData{});
pub_data->rmw_node_ = node;
generate_random_gid(pub_data->gid_);
rmw_qos_profile_t adapted_qos_profile = *qos_profile;
rmw_ret_t ret = QoS::get().best_available_qos(
node, topic_name.c_str(), &adapted_qos_profile, rmw_get_subscriptions_info_by_topic);
Expand All @@ -61,9 +58,8 @@ std::shared_ptr<PublisherData> PublisherData::make(
rcutils_allocator_t * allocator = &node->context->options.allocator;

const rosidl_type_hash_t * type_hash = type_support->get_type_hash_func(type_support);
pub_data->type_support_impl_ = type_support->data;
auto callbacks = static_cast<const message_type_support_callbacks_t *>(type_support->data);
pub_data->type_support_ = std::make_unique<MessageTypeSupport>(callbacks);
auto message_type_support = std::make_unique<MessageTypeSupport>(callbacks);

// Convert the type hash to a string so that it can be included in
// the keyexpr.
Expand All @@ -82,7 +78,7 @@ std::shared_ptr<PublisherData> PublisherData::make(
});

std::size_t domain_id = node_info.domain_id_;
pub_data->entity_ = liveliness::Entity::make(
auto entity = liveliness::Entity::make(
z_info_zid(session),
std::to_string(node_id),
std::to_string(publisher_id),
Expand All @@ -91,19 +87,19 @@ std::shared_ptr<PublisherData> PublisherData::make(
liveliness::TopicInfo{
std::move(domain_id),
topic_name,
pub_data->type_support_->get_name(),
message_type_support->get_name(),
type_hash_c_str,
adapted_qos_profile}
);
if (pub_data->entity_ == nullptr) {
if (entity == nullptr) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to generate keyexpr for liveliness token for the publisher %s.",
topic_name.c_str());
return nullptr;
}
z_owned_keyexpr_t keyexpr = z_keyexpr_new(
pub_data->entity_->topic_info()->topic_keyexpr_.c_str());
entity->topic_info()->topic_keyexpr_.c_str());
auto always_free_ros_keyexpr = rcpputils::make_scope_exit(
[&keyexpr]() {
z_keyexpr_drop(z_move(keyexpr));
Expand All @@ -114,6 +110,7 @@ std::shared_ptr<PublisherData> PublisherData::make(
}

// Create a Publication Cache if durability is transient_local.
std::optional<ze_owned_publication_cache_t> pub_cache = std::nullopt;
if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) {
ze_publication_cache_options_t pub_cache_opts = ze_publication_cache_options_default();
pub_cache_opts.history = adapted_qos_profile.depth;
Expand All @@ -124,26 +121,26 @@ std::shared_ptr<PublisherData> PublisherData::make(
// When such a prefix is added to the PublicationCache, it listens to queries with this extra
// prefix (allowing to be queried in a unique way), but still replies with the original
// publications' key expressions.
z_owned_keyexpr_t queryable_prefix = z_keyexpr_new(pub_data->entity_->zid().c_str());
z_owned_keyexpr_t queryable_prefix = z_keyexpr_new(entity->zid().c_str());
auto always_free_queryable_prefix = rcpputils::make_scope_exit(
[&queryable_prefix]() {
z_keyexpr_drop(z_move(queryable_prefix));
});
pub_cache_opts.queryable_prefix = z_loan(queryable_prefix);
pub_data->pub_cache_ = ze_declare_publication_cache(
pub_cache = ze_declare_publication_cache(
session,
z_loan(keyexpr),
&pub_cache_opts
);
if (!pub_data->pub_cache_.has_value() || !z_check(pub_data->pub_cache_.value())) {
if (!pub_cache.has_value() || !z_check(pub_cache.value())) {
RMW_SET_ERROR_MSG("unable to create zenoh publisher cache");
return nullptr;
}
}
auto undeclare_z_publisher_cache = rcpputils::make_scope_exit(
[pub_data]() {
if (pub_data && pub_data->pub_cache_.has_value()) {
z_drop(z_move(pub_data->pub_cache_.value()));
[&pub_cache]() {
if (pub_cache.has_value()) {
z_drop(z_move(pub_cache.value()));
}
});

Expand All @@ -156,54 +153,73 @@ std::shared_ptr<PublisherData> PublisherData::make(
opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK;
}
// TODO(clalancette): What happens if the key name is a valid but empty string?
pub_data->pub_ = z_declare_publisher(
z_owned_publisher_t pub = z_declare_publisher(
session,
z_loan(keyexpr),
&opts
);
if (!z_check(pub_data->pub_)) {
if (!z_check(pub)) {
RMW_SET_ERROR_MSG("Unable to create Zenoh publisher.");
return nullptr;
}
auto undeclare_z_publisher = rcpputils::make_scope_exit(
[pub_data]() {
z_undeclare_publisher(z_move(pub_data->pub_));
[&pub]() {
z_undeclare_publisher(z_move(pub));
});

pub_data->token_ = zc_liveliness_declare_token(
zc_owned_liveliness_token_t token = zc_liveliness_declare_token(
session,
z_keyexpr(pub_data->entity_->liveliness_keyexpr().c_str()),
z_keyexpr(entity->liveliness_keyexpr().c_str()),
NULL
);
auto free_token = rcpputils::make_scope_exit(
[pub_data]() {
if (pub_data != nullptr) {
z_drop(z_move(pub_data->token_));
}
[&token]() {
z_drop(z_move(token));
});
if (!z_check(pub_data->token_)) {
if (!z_check(token)) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to create liveliness token for the publisher.");
return nullptr;
}

// Initialize the events manager.
pub_data->events_mgr_ = std::make_shared<EventsManager>();

free_token.cancel();
undeclare_z_publisher_cache.cancel();
undeclare_z_publisher.cancel();

return pub_data;
return std::shared_ptr<PublisherData>(
new PublisherData{
node,
std::move(entity),
std::move(pub),
std::move(pub_cache),
std::move(token),
type_support->data,
std::move(message_type_support)
});
}

///=============================================================================
PublisherData::PublisherData()
: sequence_number_(1),
PublisherData::PublisherData(
const rmw_node_t * rmw_node,
std::shared_ptr<liveliness::Entity> entity,
z_owned_publisher_t pub,
std::optional<ze_owned_publication_cache_t> pub_cache,
zc_owned_liveliness_token_t token,
const void * type_support_impl,
std::unique_ptr<MessageTypeSupport> type_support)
: rmw_node_(rmw_node),
entity_(std::move(entity)),
pub_(std::move(pub)),
pub_cache_(std::move(pub_cache)),
token_(std::move(token)),
type_support_impl_(type_support_impl),
type_support_(std::move(type_support)),
sequence_number_(1),
is_shutdown_(false)
{
// Do nothing.
generate_random_gid(gid_);
events_mgr_ = std::make_shared<EventsManager>();
}

///=============================================================================
Expand Down
9 changes: 8 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,14 @@ class PublisherData final

private:
// Constructor.
PublisherData();
PublisherData(
const rmw_node_t * rmw_node,
std::shared_ptr<liveliness::Entity> entity,
z_owned_publisher_t pub,
std::optional<ze_owned_publication_cache_t> pub_cache,
zc_owned_liveliness_token_t token,
const void * type_support_impl,
std::unique_ptr<MessageTypeSupport> type_support);

// Internal mutex.
mutable std::mutex mutex_;
Expand Down

0 comments on commit 8ba1c91

Please sign in to comment.