Skip to content

Commit

Permalink
Update GraphCache for services & clients along with remaining graph i…
Browse files Browse the repository at this point in the history
…ntrospection methods (#90)

* Put and del liveliness tokens for service and clients

* Remove suffix from service type in liveliness tokens

* Update GraphCache with service and client information

* Update get_entity_names_and_types

* Mangle node and topic names

* Implement get_service_names_and_types

* Implement count methods for services and clients

* Implement check for service availability

* Set channel bound to 0 for zc_liveliness_get

* Set consolidation and other options for queryables

Signed-off-by: Yadunund <[email protected]>
Co-authored-by: Chris Lalancette <[email protected]>
  • Loading branch information
Yadunund and clalancette authored Jan 17, 2024
1 parent b201d81 commit 9823c65
Show file tree
Hide file tree
Showing 9 changed files with 441 additions and 75 deletions.
212 changes: 174 additions & 38 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp

Large diffs are not rendered by default.

33 changes: 32 additions & 1 deletion rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,14 @@


///=============================================================================
// TODO(Yadunund): Since we reuse pub_count_ and sub_count_ for pub/sub and
// service/client consider more general names for these fields.
struct TopicStats
{
// The count of publishers or clients.
std::size_t pub_count_;

// The count of subscriptions or services.
std::size_t sub_count_;

// Constructor which initializes counters to 0.
Expand Down Expand Up @@ -69,8 +74,15 @@ struct GraphNode
using TopicDataMap = std::unordered_map<std::string, TopicDataPtr>;
// Map topic name to TopicDataMap
using TopicMap = std::unordered_map<std::string, TopicDataMap>;

// Entries for pub/sub.
TopicMap pubs_ = {};
TopicMap subs_ = {};

// Entires for service/client.
TopicMap clients_ = {};
TopicMap services_ = {};

};
using GraphNodePtr = std::shared_ptr<GraphNode>;

Expand All @@ -94,6 +106,10 @@ class GraphCache final
bool no_demangle,
rmw_names_and_types_t * topic_names_and_types) const;

rmw_ret_t get_service_names_and_types(
rcutils_allocator_t * allocator,
rmw_names_and_types_t * service_names_and_types) const;

rmw_ret_t count_publishers(
const char * topic_name,
size_t * count) const;
Expand All @@ -102,6 +118,14 @@ class GraphCache final
const char * topic_name,
size_t * count) const;

rmw_ret_t count_services(
const char * service_name,
size_t * count) const;

rmw_ret_t count_clients(
const char * service_name,
size_t * count) const;

rmw_ret_t get_entity_names_and_types_by_node(
liveliness::EntityType entity_type,
rcutils_allocator_t * allocator,
Expand All @@ -117,6 +141,11 @@ class GraphCache final
bool no_demangle,
rmw_topic_endpoint_info_array_t * endpoints_info) const;

rmw_ret_t service_server_is_available(
const char * service_name,
const char * service_type,
bool * is_available);

private:
/*
namespace_1:
Expand Down Expand Up @@ -146,8 +175,10 @@ class GraphCache final
// Map namespace to a map of <node_name, GraphNodePtr>.
NamespaceMap graph_ = {};

// Optimize topic lookups across the graph.
// Optimize pub/sub lookups across the graph.
GraphNode::TopicMap graph_topics_ = {};
// Optimize service/client lookups across the graph.
GraphNode::TopicMap graph_services_ = {};

mutable std::mutex graph_mutex_;
};
Expand Down
40 changes: 35 additions & 5 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ static const char PUB_STR[] = "MP";
static const char SUB_STR[] = "MS";
static const char SRV_STR[] = "SS";
static const char CLI_STR[] = "SC";
static const char SLASH_REPLACEMENT = '%';

static const std::unordered_map<EntityType, std::string> entity_to_str = {
{EntityType::Node, NODE_STR},
Expand Down Expand Up @@ -149,12 +150,13 @@ Entity::Entity(
token_ss << "/";
}
// Finally append node name.
token_ss << node_info_.name_;
token_ss << mangle_name(node_info_.name_);
// If this entity has a topic info, append it to the token.
if (topic_info_.has_value()) {
const auto & topic_info = this->topic_info_.value();
// Note: We don't append a leading "/" as we expect the ROS topic name to start with a "/".
token_ss << topic_info.name_ + "/" + topic_info.type_ + "/" + topic_info.qos_;
token_ss <<
"/" + mangle_name(topic_info.name_) + "/" + topic_info.type_ + "/" + topic_info.qos_;
}

this->keyexpr_ = token_ss.str();
Expand Down Expand Up @@ -255,15 +257,15 @@ std::optional<Entity> Entity::make(const std::string & keyexpr)
if (entity_it == str_to_entity.end()) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Received liveliness token with invalid entity.");
"Received liveliness token with invalid entity %s.", entity_str.c_str());
return std::nullopt;
}

EntityType entity_type = entity_it->second;
std::size_t domain_id = std::stoul(parts[1]);
std::string & id = parts[2];
std::string ns = parts[4] == "_" ? "/" : "/" + std::move(parts[4]);
std::string node_name = std::move(parts[5]);
std::string node_name = demangle_name(std::move(parts[5]));
std::optional<TopicInfo> topic_info = std::nullopt;

// Populate topic_info if we have a token for an entity other than a node.
Expand All @@ -275,7 +277,7 @@ std::optional<Entity> Entity::make(const std::string & keyexpr)
return std::nullopt;
}
topic_info = TopicInfo{
"/" + std::move(parts[6]),
demangle_name(std::move(parts[6])),
std::move(parts[7]),
std::move(parts[8])};
}
Expand Down Expand Up @@ -387,4 +389,32 @@ bool PublishToken::del(
return true;
}

///=============================================================================
std::string mangle_name(const std::string & input)
{
std::string output = "";
for (std::size_t i = 0; i < input.length(); ++i) {
if (input[i] == '/') {
output += SLASH_REPLACEMENT;
} else {
output += input[i];
}
}
return output;
}

///=============================================================================
std::string demangle_name(const std::string & input)
{
std::string output = "";
for (std::size_t i = 0; i < input.length(); ++i) {
if (input[i] == SLASH_REPLACEMENT) {
output += '/';
} else {
output += input[i];
}
}
return output;
}

} // namespace liveliness
6 changes: 6 additions & 0 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ class PublishToken
const std::string & token);
};

/// Replace "/" instances with "%".
std::string mangle_name(const std::string & input);

/// Replace "%" instances with "/".
std::string demangle_name(const std::string & input);

} // namespace liveliness

#endif // DETAIL__LIVELINESS_UTILS_HPP_
7 changes: 6 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ struct rmw_service_data_t
z_owned_keyexpr_t keyexpr;
z_owned_queryable_t qable;

// Liveliness token for the service.
zc_owned_liveliness_token_t token;

const void * request_type_support_impl;
const void * response_type_support_impl;
const char * typesupport_identifier;
Expand All @@ -171,9 +174,11 @@ struct rmw_service_data_t
struct rmw_client_data_t
{
z_owned_keyexpr_t keyexpr;

z_owned_closure_reply_t zn_closure_reply;

// Liveliness token for the client.
zc_owned_liveliness_token_t token;

std::mutex message_mutex;
std::deque<z_owned_reply_t> replies;

Expand Down
14 changes: 10 additions & 4 deletions rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "detail/rmw_data_types.hpp"

#include "rmw/error_handling.h"
#include "rmw/get_service_names_and_types.h"

extern "C"
Expand All @@ -25,9 +27,13 @@ rmw_get_service_names_and_types(
rcutils_allocator_t * allocator,
rmw_names_and_types_t * service_names_and_types)
{
static_cast<void>(node);
static_cast<void>(allocator);
static_cast<void>(service_names_and_types);
return RMW_RET_UNSUPPORTED;
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(service_names_and_types, RMW_RET_INVALID_ARGUMENT);

return node->context->impl->graph_cache.get_service_names_and_types(
allocator, service_names_and_types);
}
} // extern "C"
4 changes: 0 additions & 4 deletions rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,9 @@

#include "detail/rmw_data_types.hpp"

#include "rcutils/strdup.h"

#include "rmw/error_handling.h"
#include "rmw/get_topic_names_and_types.h"

#include "rcpputils/scope_exit.hpp"

extern "C"
{
///==============================================================================
Expand Down
7 changes: 6 additions & 1 deletion rmw_zenoh_cpp/src/rmw_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,12 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
"Sending Query '%s' to fetch discovery data...",
liveliness_str.c_str()
);
z_owned_reply_channel_t channel = zc_reply_fifo_new(16);
// We create a blocking channel that is unbounded, ie. `bound` = 0, to receive
// replies for the zc_liveliness_get() call. This is necessary as if the `bound`
// is too low, the channel may starve the zenoh executor of its threads which
// would lead to deadlocks when trying to receive replies and block the
// execution here.
z_owned_reply_channel_t channel = zc_reply_fifo_new(0);
zc_liveliness_get(
z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()),
z_move(channel.send), NULL);
Expand Down
Loading

0 comments on commit 9823c65

Please sign in to comment.