Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Querying subs to query newly discovered publishers with publication caches #269

Merged
merged 12 commits into from
Aug 30, 2024
32 changes: 30 additions & 2 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ void GraphCache::parse_put(
if (ignore_from_current_session && is_entity_local(*entity)) {
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"Ignoring parse_put for %s from the same session.\n", entity->keyexpr().c_str());
"Ignoring parse_put for %s from the same session.\n", entity->liveliness_keyexpr().c_str());
return;
}

Expand Down Expand Up @@ -403,6 +403,20 @@ void GraphCache::parse_put(
// Otherwise, the entity represents a node that already exists in the graph.
// Update topic info if required below.
update_topic_maps_for_put(node_it->second, entity);

// If the newly added entity is a publisher with transient_local qos durability,
// we trigger any registered querying subscriber callbacks.
if (entity->type() == liveliness::EntityType::Publisher &&
entity->topic_info().has_value() &&
entity->topic_info()->qos_.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL)
{
auto sub_cbs_it = querying_subs_cbs_.find(entity->topic_info()->topic_keyexpr_);
if (sub_cbs_it != querying_subs_cbs_.end()) {
for (const auto & cb : sub_cbs_it->second) {
cb(entity->zid());
}
}
}
}

///=============================================================================
Expand Down Expand Up @@ -559,7 +573,7 @@ void GraphCache::parse_del(
if (ignore_from_current_session && is_entity_local(*entity)) {
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"Ignoring parse_del for %s from the same session.\n", entity->keyexpr().c_str());
"Ignoring parse_del for %s from the same session.\n", entity->liveliness_keyexpr().c_str());
return;
}
// Lock the graph mutex before accessing the graph.
Expand Down Expand Up @@ -1315,4 +1329,18 @@ std::unique_ptr<rmw_zenoh_event_status_t> GraphCache::take_event_status(
status_to_take.current_count_change = 0;
return result;
}

///=============================================================================
void GraphCache::set_querying_subscriber_callback(
const std::string & keyexpr,
QueryingSubscriberCallback cb)
{
auto cb_it = querying_subs_cbs_.find(keyexpr);
if (cb_it == querying_subs_cbs_.end()) {
querying_subs_cbs_[keyexpr] = std::move(std::vector<QueryingSubscriberCallback>{});
cb_it = querying_subs_cbs_.find(keyexpr);
}
cb_it->second.push_back(std::move(cb));
}

} // namespace rmw_zenoh_cpp
16 changes: 12 additions & 4 deletions rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ using GraphNodePtr = std::shared_ptr<GraphNode>;
class GraphCache final
{
public:
/// @brief Signature for a function that will be invoked by the GraphCache when a QoS
/// event is detected.
using GraphCacheEventCallback = std::function<void (std::unique_ptr<rmw_zenoh_event_status_t>)>;
/// Callback to be triggered when a publication cache is detected in the ROS Graph.
using QueryingSubscriberCallback = std::function<void (const std::string & queryable_prefix)>;

/// @brief Constructor
/// @param id The id of the zenoh session that is building the graph cache.
/// This is used to infer which entities originated from the current session
Expand Down Expand Up @@ -169,10 +175,6 @@ class GraphCache final
const char * service_type,
bool * is_available) const;

/// @brief Signature for a function that will be invoked by the GraphCache when a QoS
/// event is detected.
using GraphCacheEventCallback = std::function<void (std::unique_ptr<rmw_zenoh_event_status_t>)>;

/// Set a qos event callback for an entity from the current session.
/// @note The callback will be removed when the entity is removed from the graph.
void set_qos_event_callback(
Expand All @@ -183,6 +185,10 @@ class GraphCache final
/// Returns true if the entity is a publisher or client. False otherwise.
static bool is_entity_pub(const liveliness::Entity & entity);

void set_querying_subscriber_callback(
const std::string & keyexpr,
QueryingSubscriberCallback cb);

private:
// Helper function to convert an Entity into a GraphNode.
// Note: this will update bookkeeping variables in GraphCache.
Expand Down Expand Up @@ -281,6 +287,8 @@ class GraphCache final
using GraphEventCallbackMap = std::unordered_map<liveliness::ConstEntityPtr, GraphEventCallbacks>;
// EventCallbackMap for each type of event we support in rmw_zenoh_cpp.
GraphEventCallbackMap event_callbacks_;
// Map keyexpressions to QueryingSubscriberCallback.
std::unordered_map<std::string, std::vector<QueryingSubscriberCallback>> querying_subs_cbs_;
// Counters to track changes to event statues for each topic.
std::unordered_map<std::string,
std::array<rmw_zenoh_event_status_t, ZENOH_EVENT_ID_MAX + 1>> event_statuses_;
Expand Down
38 changes: 32 additions & 6 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,27 @@ NodeInfo::NodeInfo(
// Do nothing.
}

namespace
{
// Helper function to create a copy of a string after removing any
// leading or trailing slashes.
std::string strip_slashes(const std::string & str)
{
std::string ret = str;
std::size_t start = 0;
Yadunund marked this conversation as resolved.
Show resolved Hide resolved
std::size_t end = str.length() - 1;
if (str[0] == '/') {
++start;
}
if (str[end] == '/') {
--end;
}
return ret.substr(start, end - start + 1);
}
} // namespace
///=============================================================================
TopicInfo::TopicInfo(
std::size_t domain_id,
std::string name,
std::string type,
std::string type_hash,
Expand All @@ -60,7 +79,13 @@ TopicInfo::TopicInfo(
type_hash_(std::move(type_hash)),
qos_(std::move(qos))
{
// Do nothing.
topic_keyexpr_ = std::to_string(domain_id);
topic_keyexpr_ += "/";
topic_keyexpr_ += strip_slashes(name_);
topic_keyexpr_ += "/";
topic_keyexpr_ += type_;
topic_keyexpr_ += "/";
topic_keyexpr_ += type_hash_;
}

///=============================================================================
Expand Down Expand Up @@ -403,7 +428,7 @@ Entity::Entity(
for (std::size_t i = 0; i < KEYEXPR_INDEX_MAX + 1; ++i) {
bool last = false;
if (!keyexpr_parts[i].empty()) {
this->keyexpr_ += std::move(keyexpr_parts[i]);
this->liveliness_keyexpr_ += std::move(keyexpr_parts[i]);
Yadunund marked this conversation as resolved.
Show resolved Hide resolved
}
if (i == KEYEXPR_INDEX_MAX || keyexpr_parts[i + 1].empty()) {
last = true;
Expand All @@ -412,9 +437,9 @@ Entity::Entity(
break;
}
// Append the delimiter unless it is the last component.
this->keyexpr_ += KEYEXPR_DELIMITER;
this->liveliness_keyexpr_ += KEYEXPR_DELIMITER;
}
this->guid_ = std::hash<std::string>{}(this->keyexpr_);
this->guid_ = std::hash<std::string>{}(this->liveliness_keyexpr_);
}

///=============================================================================
Expand Down Expand Up @@ -521,6 +546,7 @@ std::shared_ptr<Entity> Entity::make(const std::string & keyexpr)
return nullptr;
}
topic_info = TopicInfo{
domain_id,
demangle_name(std::move(parts[KeyexprIndex::TopicName])),
demangle_name(std::move(parts[KeyexprIndex::TopicType])),
demangle_name(std::move(parts[KeyexprIndex::TopicTypeHash])),
Expand Down Expand Up @@ -590,9 +616,9 @@ std::optional<TopicInfo> Entity::topic_info() const
}

///=============================================================================
std::string Entity::keyexpr() const
std::string Entity::liveliness_keyexpr() const
{
return this->keyexpr_;
return this->liveliness_keyexpr_;
}

///=============================================================================
Expand Down
6 changes: 4 additions & 2 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ struct TopicInfo
std::string name_;
std::string type_;
std::string type_hash_;
std::string topic_keyexpr_;
rmw_qos_profile_t qos_;

TopicInfo(
std::size_t domain_id,
Yadunund marked this conversation as resolved.
Show resolved Hide resolved
std::string name,
std::string type,
std::string type_hash,
Expand Down Expand Up @@ -162,7 +164,7 @@ class Entity
std::optional<TopicInfo> topic_info() const;

/// Get the liveliness keyexpr for this entity.
std::string keyexpr() const;
std::string liveliness_keyexpr() const;

// Two entities are equal if their guids are equal.
bool operator==(const Entity & other) const;
Expand All @@ -183,7 +185,7 @@ class Entity
EntityType type_;
NodeInfo node_info_;
std::optional<TopicInfo> topic_info_;
std::string keyexpr_;
std::string liveliness_keyexpr_;
};

///=============================================================================
Expand Down
Loading