Skip to content

Commit

Permalink
liveliness_declare_subscriber
Browse files Browse the repository at this point in the history
Signed-off-by: Alejandro Hernández Cordero <[email protected]>
  • Loading branch information
ahcorde committed Nov 19, 2024
1 parent 0635799 commit 0963896
Showing 1 changed file with 37 additions and 27 deletions.
64 changes: 37 additions & 27 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,26 +213,33 @@ class rmw_context_impl_s::Data final

// Setup the liveliness subscriber to receives updates from the ROS graph
// and update the graph cache.
zc_liveliness_subscriber_options_t sub_options;
zc_liveliness_subscriber_options_default(&sub_options);
z_owned_closure_sample_t callback;
z_closure(&callback, graph_sub_data_handler, nullptr, this);
z_view_keyexpr_t liveliness_ke;
z_view_keyexpr_from_str(&liveliness_ke, liveliness_str.c_str());
auto undeclare_z_sub = rcpputils::make_scope_exit(
[this]() {
z_undeclare_subscriber(z_move(this->graph_subscriber_));
});
if (zc_liveliness_declare_subscriber(
z_loan(session_->_0),
&graph_subscriber_, z_loan(liveliness_ke),
z_move(callback), &sub_options) != Z_OK)
{
RMW_SET_ERROR_MSG("unable to create zenoh subscription");
throw std::runtime_error("Unable to subscribe to ROS graph updates.");
}
zenoh::KeyExpr keyexpr_cpp(liveliness_str.c_str());
zenoh::Session::LivelinessSubscriberOptions sub_options =
zenoh::Session::LivelinessSubscriberOptions::create_default();
sub_options.history = true;
graph_subscriber_cpp_ = session_->liveliness_declare_subscriber(
keyexpr_cpp,
[&](const zenoh::Sample& s) {
// Update the graph cache.
std::lock_guard<std::recursive_mutex> lock(mutex_);
switch (s.get_kind()) {
case z_sample_kind_t::Z_SAMPLE_KIND_PUT:
graph_cache_->parse_put(std::string{s.get_keyexpr().as_string_view()});
break;
case z_sample_kind_t::Z_SAMPLE_KIND_DELETE:
graph_cache_->parse_del(std::string{s.get_keyexpr().as_string_view()});
break;
default:
return;
}
},
zenoh::closures::none,
std::move(sub_options),
&err);

undeclare_z_sub.cancel();
if (err != Z_OK) {
throw std::runtime_error("unable to create zenoh subscription. ");
}
}

// Shutdown the Zenoh session.
Expand All @@ -258,7 +265,16 @@ class rmw_context_impl_s::Data final
}
}

z_undeclare_subscriber(z_move(graph_subscriber_));
zenoh::ZResult err;
std::move(graph_subscriber_cpp_).value().undeclare(&err);
if (err != Z_OK)
{
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to undeclare liveliness token");
return RMW_RET_ERROR;
}

if (shm_provider_.has_value()) {
z_drop(z_move(shm_provider_.value()));
}
Expand All @@ -267,12 +283,6 @@ class rmw_context_impl_s::Data final
// We specifically do *not* hold the mutex_ while tearing down the session; this allows us
// to avoid an AB/BA deadlock if shutdown is racing with graph_sub_data_handler().
}

// // Close the zenoh session
// if (z_close(z_loan_mut(session_), NULL) != Z_OK) {
// RMW_SET_ERROR_MSG("Error while closing zenoh session");
// return RMW_RET_ERROR;
// }
session_.reset();
return RMW_RET_OK;
}
Expand Down Expand Up @@ -437,7 +447,7 @@ class rmw_context_impl_s::Data final
// Graph cache.
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache_;
// ROS graph liveliness subscriber.
z_owned_subscriber_t graph_subscriber_;
std::optional<zenoh::Subscriber<void>> graph_subscriber_cpp_;
// Equivalent to rmw_dds_common::Context's guard condition.
// Guard condition that should be triggered when the graph changes.
std::unique_ptr<rmw_guard_condition_t> graph_guard_condition_;
Expand Down

0 comments on commit 0963896

Please sign in to comment.