Skip to content

Commit

Permalink
Added zenoh_cpp session and token to subscription service_data publis…
Browse files Browse the repository at this point in the history
…her_data node_data

Signed-off-by: Alejandro Hernández Cordero <[email protected]>
  • Loading branch information
ahcorde committed Nov 14, 2024
1 parent d241800 commit 1ceae7a
Show file tree
Hide file tree
Showing 11 changed files with 161 additions and 114 deletions.
18 changes: 2 additions & 16 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ rmw_ret_t rmw_context_impl_s::Data::subscribe_to_ros_graph()
// zenoh::Session::LivelinessSubscriberOptions options = zenoh::Session::LivelinessSubscriberOptions::create_default();
// options.history = true;
// zenoh::ZResult err;
// auto graph_subscriber_cpp = session_cpp_.liveliness_declare_subscriber(
// graph_subscriber_cpp_ = session_cpp_->liveliness_declare_subscriber(
// keyexpr_cpp,
// [](const zenoh::Sample& s) {
// auto data_ptr = static_cast<Data*>(static_cast<void*>(s.get_attachment().value().get().as_vector().data()));
Expand Down Expand Up @@ -168,8 +168,6 @@ rmw_ret_t rmw_context_impl_s::Data::subscribe_to_ros_graph()
// std::move(options),
// &err);

// graph_subscriber_cpp_.push_back(std::move(graph_subscriber_cpp));

// if (err != Z_OK) {
// RMW_SET_ERROR_MSG("unable to create zenoh subscription");
// return RMW_RET_ERROR;
Expand Down Expand Up @@ -227,8 +225,6 @@ rmw_ret_t rmw_context_impl_s::Data::shutdown()

z_undeclare_subscriber(z_move(graph_subscriber_));

// graph_subscriber_cpp_.clear();

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// drop SHM subsystem if used
shm_ = std::nullopt;
Expand Down Expand Up @@ -269,14 +265,6 @@ rmw_context_impl_s::rmw_context_impl_s(
throw std::runtime_error("Error setting up zenoh session. " + std::to_string(static_cast<int>(result)));
}

// // Check if shm is enabled.
// z_owned_string_t shm_enabled;
// zc_config_get_from_str(z_loan(config), Z_CONFIG_SHARED_MEMORY_KEY, &shm_enabled);
// auto always_free_shm_enabled = rcpputils::make_scope_exit(
// [&shm_enabled]() {
// z_drop(z_move(shm_enabled));
// });

rmw_ret_t ret;

// TODO(Yadunund) Move this check into a separate thread.
Expand Down Expand Up @@ -340,8 +328,6 @@ rmw_context_impl_s::rmw_context_impl_s(
for (auto res = replies.recv(); std::holds_alternative<zenoh::Reply>(res); res = replies.recv()) {
const auto &sample = std::get<zenoh::Reply>(res).get_ok();
graph_cache->parse_put(sample.get_payload().as_string(), true);
std::cout << "Received ('" << sample.get_keyexpr().as_string_view() << "' : '"
<< sample.get_payload().as_string() << "')\n";
}

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
Expand Down Expand Up @@ -478,7 +464,7 @@ bool rmw_context_impl_s::create_node_data(
auto node_data = rmw_zenoh_cpp::NodeData::make(
node,
this->get_next_entity_id(),
z_loan(data_->session_),
data_->session_cpp_,
data_->domain_id_,
ns,
node_name,
Expand Down
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class rmw_context_impl_s final
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache_;
// ROS graph liveliness subscriber.
z_owned_subscriber_t graph_subscriber_;
std::vector<zenoh::Subscriber<void>> graph_subscriber_cpp_;
std::optional<zenoh::Subscriber<void>> graph_subscriber_cpp_;
// Equivalent to rmw_dds_common::Context's guard condition.
// Guard condition that should be triggered when the graph changes.
std::unique_ptr<rmw_guard_condition_t> graph_guard_condition_;
Expand Down
57 changes: 35 additions & 22 deletions rmw_zenoh_cpp/src/detail/rmw_node_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ namespace rmw_zenoh_cpp
std::shared_ptr<NodeData> NodeData::make(
const rmw_node_t * const node,
std::size_t id,
const z_loaned_session_t * session,
const std::shared_ptr<zenoh::Session> & session,
std::size_t domain_id,
const std::string & namespace_,
const std::string & node_name,
const std::string & enclave)
{
// Create the entity.
auto entity = rmw_zenoh_cpp::liveliness::Entity::make(
z_info_zid(session),
auto entity = rmw_zenoh_cpp::liveliness::Entity::make_cpp(
session->get_zid(),
std::to_string(id),
std::to_string(id),
rmw_zenoh_cpp::liveliness::EntityType::Node,
Expand All @@ -58,20 +58,33 @@ std::shared_ptr<NodeData> NodeData::make(

// Create the liveliness token.
std::string liveliness_keyexpr = entity->liveliness_keyexpr();
z_view_keyexpr_t liveliness_ke;
z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str());
zc_owned_liveliness_token_t token;
auto free_token = rcpputils::make_scope_exit(
[&token]() {
z_drop(z_move(token));
});
if (zc_liveliness_declare_token(session, &token, z_loan(liveliness_ke), NULL) != Z_OK) {
zenoh::ZResult err;
auto token = session->liveliness_declare_token(
zenoh::KeyExpr(liveliness_keyexpr),
zenoh::Session::LivelinessDeclarationOptions::create_default(),
&err);

if (err != Z_OK)
{
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to create liveliness token for the node.");
"rmw_zenoh_cpp",
"Unable to create liveliness token for the client.");
return nullptr;
}
free_token.cancel();
// z_view_keyexpr_t liveliness_ke;
// z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str());
// zc_owned_liveliness_token_t token;
// auto free_token = rcpputils::make_scope_exit(
// [&token]() {
// z_drop(z_move(token));
// });
// if (zc_liveliness_declare_token(session, &token, z_loan(liveliness_ke), NULL) != Z_OK) {
// RMW_ZENOH_LOG_ERROR_NAMED(
// "rmw_zenoh_cpp",
// "Unable to create liveliness token for the node.");
// return nullptr;
// }
// free_token.cancel();

return std::shared_ptr<NodeData>(
new NodeData{
Expand All @@ -87,7 +100,7 @@ NodeData::NodeData(
const rmw_node_t * const node,
std::size_t id,
std::shared_ptr<liveliness::Entity> entity,
zc_owned_liveliness_token_t token)
zenoh::LivelinessToken token)
: node_(node),
id_(std::move(id)),
entity_(std::move(entity)),
Expand Down Expand Up @@ -121,7 +134,7 @@ std::size_t NodeData::id() const
///=============================================================================
bool NodeData::create_pub_data(
const rmw_publisher_t * const publisher,
const z_loaned_session_t * session,
const std::shared_ptr<zenoh::Session> & session,
std::size_t id,
const std::string & topic_name,
const rosidl_message_type_support_t * type_support,
Expand All @@ -143,7 +156,7 @@ bool NodeData::create_pub_data(
}

auto pub_data = PublisherData::make(
std::move(session),
session,
node_,
entity_->node_info(),
id_,
Expand Down Expand Up @@ -187,7 +200,7 @@ void NodeData::delete_pub_data(const rmw_publisher_t * const publisher)
///=============================================================================
bool NodeData::create_sub_data(
const rmw_subscription_t * const subscription,
const z_loaned_session_t * session,
const std::shared_ptr<zenoh::Session> & session,
std::shared_ptr<GraphCache> graph_cache,
std::size_t id,
const std::string & topic_name,
Expand All @@ -210,7 +223,7 @@ bool NodeData::create_sub_data(
}

auto sub_data = SubscriptionData::make(
std::move(session),
session,
std::move(graph_cache),
node_,
entity_->node_info(),
Expand Down Expand Up @@ -255,7 +268,7 @@ void NodeData::delete_sub_data(const rmw_subscription_t * const subscription)
///=============================================================================
bool NodeData::create_service_data(
const rmw_service_t * const service,
const z_loaned_session_t * session,
const std::shared_ptr<zenoh::Session> & session,
std::size_t id,
const std::string & service_name,
const rosidl_service_type_support_t * type_supports,
Expand All @@ -277,7 +290,7 @@ bool NodeData::create_service_data(
}

auto service_data = ServiceData::make(
std::move(session),
session,
node_,
entity_->node_info(),
id_,
Expand Down Expand Up @@ -366,7 +379,7 @@ rmw_ret_t NodeData::shutdown()
}

// Unregister this node from the ROS graph.
zc_liveliness_undeclare_token(z_move(token_));
// zc_liveliness_undeclare_token(z_move(token_));

is_shutdown_ = true;
return ret;
Expand Down
12 changes: 6 additions & 6 deletions rmw_zenoh_cpp/src/detail/rmw_node_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class NodeData final
static std::shared_ptr<NodeData> make(
const rmw_node_t * const node,
std::size_t id,
const z_loaned_session_t * session,
const std::shared_ptr<zenoh::Session> & session,
std::size_t domain_id,
const std::string & namespace_,
const std::string & node_name,
Expand All @@ -52,7 +52,7 @@ class NodeData final
// Create a new PublisherData for a given rmw_publisher_t.
bool create_pub_data(
const rmw_publisher_t * const publisher,
const z_loaned_session_t * session,
const std::shared_ptr<zenoh::Session> & session,
std::size_t id,
const std::string & topic_name,
const rosidl_message_type_support_t * type_support,
Expand All @@ -67,7 +67,7 @@ class NodeData final
// Create a new SubscriptionData for a given rmw_subscription_t.
bool create_sub_data(
const rmw_subscription_t * const subscription,
const z_loaned_session_t * session,
const std::shared_ptr<zenoh::Session> & session,
std::shared_ptr<GraphCache> graph_cache,
std::size_t id,
const std::string & topic_name,
Expand All @@ -83,7 +83,7 @@ class NodeData final
// Create a new ServiceData for a given rmw_service_t.
bool create_service_data(
const rmw_service_t * const service,
const z_loaned_session_t * session,
const std::shared_ptr<zenoh::Session> & session,
std::size_t id,
const std::string & service_name,
const rosidl_service_type_support_t * type_support,
Expand All @@ -110,7 +110,7 @@ class NodeData final
const rmw_node_t * const node,
std::size_t id,
std::shared_ptr<liveliness::Entity> entity,
zc_owned_liveliness_token_t token);
zenoh::LivelinessToken token);
// Internal mutex.
mutable std::mutex mutex_;
// The rmw_node_t associated with this NodeData.
Expand All @@ -121,7 +121,7 @@ class NodeData final
// The Entity generated for the node.
std::shared_ptr<liveliness::Entity> entity_;
// Liveliness token for the node.
zc_owned_liveliness_token_t token_;
std::optional<zenoh::LivelinessToken> token_;
// Shutdown flag.
bool is_shutdown_;
// Map of publishers.
Expand Down
55 changes: 34 additions & 21 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ namespace rmw_zenoh_cpp

///=============================================================================
std::shared_ptr<PublisherData> PublisherData::make(
const z_loaned_session_t * session,
const std::shared_ptr<zenoh::Session> & session,
const rmw_node_t * const node,
liveliness::NodeInfo node_info,
std::size_t node_id,
Expand Down Expand Up @@ -78,8 +78,8 @@ std::shared_ptr<PublisherData> PublisherData::make(
});

std::size_t domain_id = node_info.domain_id_;
auto entity = liveliness::Entity::make(
z_info_zid(session),
auto entity = liveliness::Entity::make_cpp(
session->get_zid(),
std::to_string(node_id),
std::to_string(publisher_id),
liveliness::EntityType::Publisher,
Expand Down Expand Up @@ -132,7 +132,7 @@ std::shared_ptr<PublisherData> PublisherData::make(

ze_owned_publication_cache_t pub_cache_;
if (ze_declare_publication_cache(
session, &pub_cache_, z_loan(pub_ke), &pub_cache_opts))
z_loan(session->_0), &pub_cache_, z_loan(pub_ke), &pub_cache_opts))
{
RMW_SET_ERROR_MSG("unable to create zenoh publisher cache");
return nullptr;
Expand All @@ -159,31 +159,44 @@ std::shared_ptr<PublisherData> PublisherData::make(
z_undeclare_publisher(z_move(pub));
});
if (z_declare_publisher(
session, &pub, z_loan(pub_ke), &opts) != Z_OK)
z_loan(session->_0), &pub, z_loan(pub_ke), &opts) != Z_OK)
{
RMW_SET_ERROR_MSG("Unable to create Zenoh publisher.");
return nullptr;
}

std::string liveliness_keyexpr = entity->liveliness_keyexpr();
z_view_keyexpr_t liveliness_ke;
z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str());
zc_owned_liveliness_token_t token;
auto free_token = rcpputils::make_scope_exit(
[&token]() {
z_drop(z_move(token));
});
if (zc_liveliness_declare_token(
session, &token, z_loan(liveliness_ke),
NULL) != Z_OK)
zenoh::ZResult err;
auto token = session->liveliness_declare_token(
zenoh::KeyExpr(liveliness_keyexpr),
zenoh::Session::LivelinessDeclarationOptions::create_default(),
&err);

if (err != Z_OK)
{
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to create liveliness token for the publisher.");
"rmw_zenoh_cpp",
"Unable to create liveliness token for the client.");
return nullptr;
}

free_token.cancel();
// z_view_keyexpr_t liveliness_ke;
// z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str());
// zc_owned_liveliness_token_t token;
// auto free_token = rcpputils::make_scope_exit(
// [&token]() {
// z_drop(z_move(token));
// });
// if (zc_liveliness_declare_token(
// session, &token, z_loan(liveliness_ke),
// NULL) != Z_OK)
// {
// RMW_ZENOH_LOG_ERROR_NAMED(
// "rmw_zenoh_cpp",
// "Unable to create liveliness token for the publisher.");
// return nullptr;
// }

// free_token.cancel();
undeclare_z_publisher_cache.cancel();
undeclare_z_publisher.cancel();

Expand All @@ -205,7 +218,7 @@ PublisherData::PublisherData(
std::shared_ptr<liveliness::Entity> entity,
z_owned_publisher_t pub,
std::optional<ze_owned_publication_cache_t> pub_cache,
zc_owned_liveliness_token_t token,
zenoh::LivelinessToken token,
const void * type_support_impl,
std::unique_ptr<MessageTypeSupport> type_support)
: rmw_node_(rmw_node),
Expand Down Expand Up @@ -455,7 +468,7 @@ rmw_ret_t PublisherData::shutdown()
}

// Unregister this publisher from the ROS graph.
zc_liveliness_undeclare_token(z_move(token_));
// zc_liveliness_undeclare_token(z_move(token_));
if (pub_cache_.has_value()) {
z_drop(z_move(pub_cache_.value()));
}
Expand Down
6 changes: 3 additions & 3 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class PublisherData final
public:
// Make a shared_ptr of PublisherData.
static std::shared_ptr<PublisherData> make(
const z_loaned_session_t * session,
const std::shared_ptr<zenoh::Session> & session,
const rmw_node_t * const node,
liveliness::NodeInfo node_info,
std::size_t node_id,
Expand Down Expand Up @@ -92,7 +92,7 @@ class PublisherData final
std::shared_ptr<liveliness::Entity> entity,
z_owned_publisher_t pub,
std::optional<ze_owned_publication_cache_t> pub_cache,
zc_owned_liveliness_token_t token,
zenoh::LivelinessToken token,
const void * type_support_impl,
std::unique_ptr<MessageTypeSupport> type_support);

Expand All @@ -107,7 +107,7 @@ class PublisherData final
// Optional publication cache when durability is transient_local.
std::optional<ze_owned_publication_cache_t> pub_cache_;
// Liveliness token for the publisher.
zc_owned_liveliness_token_t token_;
std::optional<zenoh::LivelinessToken> token_;
// Type support fields
const void * type_support_impl_;
std::unique_ptr<MessageTypeSupport> type_support_;
Expand Down
Loading

0 comments on commit 1ceae7a

Please sign in to comment.