Skip to content

Commit

Permalink
Querying subs to query newly discovered publishers with publication c…
Browse files Browse the repository at this point in the history
…aches (#269)

* Add queryable_prefix to pub cache

* Store topic keyexpr within Entity

* Trigger querying sub callback in parse_put

* Add options to get

* Trigger cb when qos durability is transient_local

* Set query_accept_replies in querying sub options

* fix: adopt the fix of `ze_querying_subscriber_get` options

* Switch to latest commit on zenoh-c and revert to z_get_options_t

* Adapt the QueryingSubscriber's initial query to the new queryable_prefix (#274)

* Adapt the QueryingSubscriber's initial query to the new queryable_prefix

* Make choice of query_target explicit

Signed-off-by: Yadunund <[email protected]>
Co-authored-by: yuanyuyuan <[email protected]>
Co-authored-by: Julien Enoch <[email protected]>
Co-authored-by: Chris Lalancette <[email protected]>
  • Loading branch information
4 people authored Aug 30, 2024
1 parent ff7e46c commit 60b72f0
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 158 deletions.
32 changes: 30 additions & 2 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ void GraphCache::parse_put(
if (ignore_from_current_session && is_entity_local(*entity)) {
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"Ignoring parse_put for %s from the same session.\n", entity->keyexpr().c_str());
"Ignoring parse_put for %s from the same session.\n", entity->liveliness_keyexpr().c_str());
return;
}

Expand Down Expand Up @@ -403,6 +403,20 @@ void GraphCache::parse_put(
// Otherwise, the entity represents a node that already exists in the graph.
// Update topic info if required below.
update_topic_maps_for_put(node_it->second, entity);

// If the newly added entity is a publisher with transient_local qos durability,
// we trigger any registered querying subscriber callbacks.
if (entity->type() == liveliness::EntityType::Publisher &&
entity->topic_info().has_value() &&
entity->topic_info()->qos_.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL)
{
auto sub_cbs_it = querying_subs_cbs_.find(entity->topic_info()->topic_keyexpr_);
if (sub_cbs_it != querying_subs_cbs_.end()) {
for (const auto & cb : sub_cbs_it->second) {
cb(entity->zid());
}
}
}
}

///=============================================================================
Expand Down Expand Up @@ -559,7 +573,7 @@ void GraphCache::parse_del(
if (ignore_from_current_session && is_entity_local(*entity)) {
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"Ignoring parse_del for %s from the same session.\n", entity->keyexpr().c_str());
"Ignoring parse_del for %s from the same session.\n", entity->liveliness_keyexpr().c_str());
return;
}
// Lock the graph mutex before accessing the graph.
Expand Down Expand Up @@ -1315,4 +1329,18 @@ std::unique_ptr<rmw_zenoh_event_status_t> GraphCache::take_event_status(
status_to_take.current_count_change = 0;
return result;
}

///=============================================================================
void GraphCache::set_querying_subscriber_callback(
const std::string & keyexpr,
QueryingSubscriberCallback cb)
{
auto cb_it = querying_subs_cbs_.find(keyexpr);
if (cb_it == querying_subs_cbs_.end()) {
querying_subs_cbs_[keyexpr] = std::move(std::vector<QueryingSubscriberCallback>{});
cb_it = querying_subs_cbs_.find(keyexpr);
}
cb_it->second.push_back(std::move(cb));
}

} // namespace rmw_zenoh_cpp
16 changes: 12 additions & 4 deletions rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ using GraphNodePtr = std::shared_ptr<GraphNode>;
class GraphCache final
{
public:
/// @brief Signature for a function that will be invoked by the GraphCache when a QoS
/// event is detected.
using GraphCacheEventCallback = std::function<void (std::unique_ptr<rmw_zenoh_event_status_t>)>;
/// Callback to be triggered when a publication cache is detected in the ROS Graph.
using QueryingSubscriberCallback = std::function<void (const std::string & queryable_prefix)>;

/// @brief Constructor
/// @param id The id of the zenoh session that is building the graph cache.
/// This is used to infer which entities originated from the current session
Expand Down Expand Up @@ -169,10 +175,6 @@ class GraphCache final
const char * service_type,
bool * is_available) const;

/// @brief Signature for a function that will be invoked by the GraphCache when a QoS
/// event is detected.
using GraphCacheEventCallback = std::function<void (std::unique_ptr<rmw_zenoh_event_status_t>)>;

/// Set a qos event callback for an entity from the current session.
/// @note The callback will be removed when the entity is removed from the graph.
void set_qos_event_callback(
Expand All @@ -183,6 +185,10 @@ class GraphCache final
/// Returns true if the entity is a publisher or client. False otherwise.
static bool is_entity_pub(const liveliness::Entity & entity);

void set_querying_subscriber_callback(
const std::string & keyexpr,
QueryingSubscriberCallback cb);

private:
// Helper function to convert an Entity into a GraphNode.
// Note: this will update bookkeeping variables in GraphCache.
Expand Down Expand Up @@ -281,6 +287,8 @@ class GraphCache final
using GraphEventCallbackMap = std::unordered_map<liveliness::ConstEntityPtr, GraphEventCallbacks>;
// EventCallbackMap for each type of event we support in rmw_zenoh_cpp.
GraphEventCallbackMap event_callbacks_;
// Map keyexpressions to QueryingSubscriberCallback.
std::unordered_map<std::string, std::vector<QueryingSubscriberCallback>> querying_subs_cbs_;
// Counters to track changes to event statues for each topic.
std::unordered_map<std::string,
std::array<rmw_zenoh_event_status_t, ZENOH_EVENT_ID_MAX + 1>> event_statuses_;
Expand Down
38 changes: 32 additions & 6 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,27 @@ NodeInfo::NodeInfo(
// Do nothing.
}

namespace
{
// Helper function to create a copy of a string after removing any
// leading or trailing slashes.
std::string strip_slashes(const std::string & str)
{
std::string ret = str;
std::size_t start = 0;
std::size_t end = str.length() - 1;
if (str[0] == '/') {
++start;
}
if (str[end] == '/') {
--end;
}
return ret.substr(start, end - start + 1);
}
} // namespace
///=============================================================================
TopicInfo::TopicInfo(
std::size_t domain_id,
std::string name,
std::string type,
std::string type_hash,
Expand All @@ -60,7 +79,13 @@ TopicInfo::TopicInfo(
type_hash_(std::move(type_hash)),
qos_(std::move(qos))
{
// Do nothing.
topic_keyexpr_ = std::to_string(domain_id);
topic_keyexpr_ += "/";
topic_keyexpr_ += strip_slashes(name_);
topic_keyexpr_ += "/";
topic_keyexpr_ += type_;
topic_keyexpr_ += "/";
topic_keyexpr_ += type_hash_;
}

///=============================================================================
Expand Down Expand Up @@ -403,7 +428,7 @@ Entity::Entity(
for (std::size_t i = 0; i < KEYEXPR_INDEX_MAX + 1; ++i) {
bool last = false;
if (!keyexpr_parts[i].empty()) {
this->keyexpr_ += std::move(keyexpr_parts[i]);
this->liveliness_keyexpr_ += std::move(keyexpr_parts[i]);
}
if (i == KEYEXPR_INDEX_MAX || keyexpr_parts[i + 1].empty()) {
last = true;
Expand All @@ -412,9 +437,9 @@ Entity::Entity(
break;
}
// Append the delimiter unless it is the last component.
this->keyexpr_ += KEYEXPR_DELIMITER;
this->liveliness_keyexpr_ += KEYEXPR_DELIMITER;
}
this->guid_ = std::hash<std::string>{}(this->keyexpr_);
this->guid_ = std::hash<std::string>{}(this->liveliness_keyexpr_);
}

///=============================================================================
Expand Down Expand Up @@ -521,6 +546,7 @@ std::shared_ptr<Entity> Entity::make(const std::string & keyexpr)
return nullptr;
}
topic_info = TopicInfo{
domain_id,
demangle_name(std::move(parts[KeyexprIndex::TopicName])),
demangle_name(std::move(parts[KeyexprIndex::TopicType])),
demangle_name(std::move(parts[KeyexprIndex::TopicTypeHash])),
Expand Down Expand Up @@ -590,9 +616,9 @@ std::optional<TopicInfo> Entity::topic_info() const
}

///=============================================================================
std::string Entity::keyexpr() const
std::string Entity::liveliness_keyexpr() const
{
return this->keyexpr_;
return this->liveliness_keyexpr_;
}

///=============================================================================
Expand Down
6 changes: 4 additions & 2 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ struct TopicInfo
std::string name_;
std::string type_;
std::string type_hash_;
std::string topic_keyexpr_;
rmw_qos_profile_t qos_;

TopicInfo(
std::size_t domain_id,
std::string name,
std::string type,
std::string type_hash,
Expand Down Expand Up @@ -162,7 +164,7 @@ class Entity
std::optional<TopicInfo> topic_info() const;

/// Get the liveliness keyexpr for this entity.
std::string keyexpr() const;
std::string liveliness_keyexpr() const;

// Two entities are equal if their guids are equal.
bool operator==(const Entity & other) const;
Expand All @@ -183,7 +185,7 @@ class Entity
EntityType type_;
NodeInfo node_info_;
std::optional<TopicInfo> topic_info_;
std::string keyexpr_;
std::string liveliness_keyexpr_;
};

///=============================================================================
Expand Down
Loading

0 comments on commit 60b72f0

Please sign in to comment.