Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Querying subs to query newly discovered publishers with publication caches #269

Merged
merged 12 commits into from
Aug 30, 2024
Prev Previous commit
Next Next commit
Trigger querying sub callback in parse_put
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Aug 28, 2024

Verified

This commit was signed with the committer’s verified signature.
Yadunund yadunund
commit cf304745d3accd4a209c0d6139aafd804bc64c5f
11 changes: 11 additions & 0 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
@@ -403,6 +403,17 @@ void GraphCache::parse_put(
// Otherwise, the entity represents a node that already exists in the graph.
// Update topic info if required below.
update_topic_maps_for_put(node_it->second, entity);

// If the newly added entity is a publisher with transient_local qos durability,
// we trigger any registered querying subscriber callbacks.
if (entity->topic_info().has_value() && entity->type() == liveliness::EntityType::Publisher) {
auto sub_cbs_it = querying_subs_cbs_.find(entity->topic_info()->topic_keyexpr_);
if (sub_cbs_it != querying_subs_cbs_.end()) {
for (const auto & cb : sub_cbs_it->second) {
cb(entity->zid());
}
}
}
}

///=============================================================================
35 changes: 23 additions & 12 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
@@ -655,8 +655,7 @@ rmw_create_publisher(
// Set the queryable_prefix to the session id so that querying subscribers can specify this
// session id to obtain latest data from this specific publication caches when querying over
// the same keyexpression.
z_owned_keyexpr_t queryable_prefix = z_keyexpr_new(
rmw_zenoh_cpp::liveliness::zid_to_str(zid).c_str());
z_owned_keyexpr_t queryable_prefix = z_keyexpr_new(publisher_data->entity->zid().c_str());
auto always_free_queryable_prefix =rcpputils::make_scope_exit(
[&queryable_prefix]() {
z_keyexpr_drop(z_move(queryable_prefix));
@@ -1520,16 +1519,28 @@ rmw_create_subscription(
}
// Register the querying subscriber with the graph cache to get latest
// messages from publishers that were discovered after their first publication.
// context_impl->graph_cache->set_querying_subscriber_callback(
// keyexpr,
// [sub_data](const std::string & queryable_prefix) -> void
// {
// if (sub_data == nullptr) {
// return;
// }
// // ze_querying_subscriber_get()
// }
// );
context_impl->graph_cache->set_querying_subscriber_callback(
sub_data->entity->topic_info()->topic_keyexpr_,
[sub_data](const std::string & queryable_prefix) -> void
{
if (sub_data == nullptr) {
return;
}
const std::string selector = queryable_prefix +
"/" +
sub_data->entity->topic_info()->topic_keyexpr_;
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"QueryingSubscriberCallback triggered over %s.",
selector.c_str()
);
ze_querying_subscriber_get(
z_loan(std::get<ze_owned_querying_subscriber_t>(sub_data->sub)),
z_keyexpr(selector.c_str()),
nullptr
);
}
);
} else {
// Create a regular subscriber for all other durability settings.
z_subscriber_options_t sub_options = z_subscriber_options_default();