diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 35b18c4f..b34e68d4 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -68,9 +68,16 @@ void GraphCache::parse_put(const std::string & keyexpr) // Helper lambda to append pub/subs to the GraphNode. // We capture by reference to update graph_topics_ auto add_topic_data = - [this](const Entity & entity, GraphNode & graph_node) -> void + [](const Entity & entity, GraphNode & graph_node, GraphCache & graph_cache) -> void { - if (entity.type() != EntityType::Publisher && entity.type() != EntityType::Subscription) { + if (entity.type() != EntityType::Publisher && + entity.type() != EntityType::Subscription && + entity.type() != EntityType::Service && + entity.type() != EntityType::Client) + { + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "add_topic_data() for invalid EntityType. Report this."); return; } @@ -84,12 +91,32 @@ void GraphCache::parse_put(const std::string & keyexpr) } const liveliness::TopicInfo topic_info = entity.topic_info().value(); - GraphNode::TopicMap & topic_map = entity.type() == - EntityType::Publisher ? graph_node.pubs_ : graph_node.subs_; - const std::string entity_desc = entity.type() == - EntityType::Publisher ? "publisher" : "subscription"; - const std::size_t pub_count = entity.type() == EntityType::Publisher ? 1 : 0; + std::string entity_desc = ""; + GraphNode::TopicMap & topic_map = + [](const Entity & entity, GraphNode & graph_node, + std::string & entity_desc) -> GraphNode::TopicMap & + { + if (entity.type() == EntityType::Publisher) { + entity_desc = "publisher"; + return graph_node.pubs_; + } else if (entity.type() == EntityType::Subscription) { + entity_desc = "subscription"; + return graph_node.subs_; + } else if (entity.type() == EntityType::Service) { + entity_desc = "service"; + return graph_node.services_; + + } else { + entity_desc = "client"; + return graph_node.clients_; + } + }(entity, graph_node, entity_desc); + // For the sake of reusing data structures and lookup functions, we treat publishers and clients are equivalent. + // Similarly, subscriptions and services are equivalent. + const std::size_t pub_count = entity.type() == EntityType::Publisher || + entity.type() == EntityType::Client ? 1 : 0; const std::size_t sub_count = !pub_count; + TopicDataPtr graph_topic_data = std::make_shared( topic_info, TopicStats{pub_count, sub_count}); @@ -115,14 +142,18 @@ void GraphCache::parse_put(const std::string & keyexpr) } // Bookkeeping: Update graph_topics_ which keeps track of topics across all nodes in the graph - GraphNode::TopicMap::iterator cache_topic_it = graph_topics_.find(topic_info.name_); - if (cache_topic_it == graph_topics_.end()) { + GraphNode::TopicMap & graph_endpoints = + entity.type() == EntityType::Publisher || entity.type() == EntityType::Subscription ? + graph_cache.graph_topics_ : + graph_cache.graph_services_; + GraphNode::TopicMap::iterator cache_topic_it = graph_endpoints.find(topic_info.name_); + if (cache_topic_it == graph_endpoints.end()) { // First time this topic name is added to the graph. std::shared_ptr topic_data_ptr = std::make_shared( topic_info, TopicStats{pub_count, sub_count} ); - graph_topics_[topic_info.name_] = GraphNode::TopicDataMap{ + graph_endpoints[topic_info.name_] = GraphNode::TopicDataMap{ {topic_info.type_, topic_data_ptr} }; } else { @@ -153,8 +184,9 @@ void GraphCache::parse_put(const std::string & keyexpr) }; // Helper lambda to convert an Entity into a GraphNode. + // Note: this will update bookkeeping variables in GraphCache. auto make_graph_node = - [&](const Entity & entity) -> std::shared_ptr + [&add_topic_data](const Entity & entity, GraphCache & graph_cache) -> std::shared_ptr { auto graph_node = std::make_shared(); graph_node->id_ = entity.id(); @@ -166,8 +198,8 @@ void GraphCache::parse_put(const std::string & keyexpr) // Token was for a node. return graph_node; } - // Add pub/sub entries. - add_topic_data(entity, *graph_node); + // Add endpoint entries. + add_topic_data(entity, *graph_node, graph_cache); return graph_node; }; @@ -179,7 +211,7 @@ void GraphCache::parse_put(const std::string & keyexpr) NamespaceMap::iterator ns_it = graph_.find(entity.node_namespace()); if (ns_it == graph_.end()) { NodeMap node_map = { - {entity.node_name(), make_graph_node(entity)}}; + {entity.node_name(), make_graph_node(entity, *this)}}; graph_.emplace(std::make_pair(entity.node_namespace(), std::move(node_map))); RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", "Added node /%s to a new namespace %s in the graph.", @@ -205,7 +237,7 @@ void GraphCache::parse_put(const std::string & keyexpr) // Either the first time a node with this name is added or with an existing // name but unique id. NodeMap::iterator insertion_it = - ns_it->second.insert(std::make_pair(entity.node_name(), make_graph_node(entity))); + ns_it->second.insert(std::make_pair(entity.node_name(), make_graph_node(entity, *this))); if (insertion_it != ns_it->second.end()) { RCUTILS_LOG_INFO_NAMED( "rmw_zenoh_cpp", @@ -242,7 +274,7 @@ void GraphCache::parse_put(const std::string & keyexpr) } // Update the graph based on the entity. - add_topic_data(entity, *(node_it->second)); + add_topic_data(entity, *(node_it->second), *this); } ///============================================================================= @@ -257,15 +289,19 @@ void GraphCache::parse_del(const std::string & keyexpr) // Helper lambda to update graph_topics_. auto update_graph_topics = - [&](const liveliness::TopicInfo topic_info, std::size_t pub_count, - std::size_t sub_count) -> void + [](const liveliness::TopicInfo topic_info, const EntityType entity_type, std::size_t pub_count, + std::size_t sub_count, GraphCache & graph_cache) -> void { + GraphNode::TopicMap & graph_endpoints = + entity_type == EntityType::Publisher || entity_type == EntityType::Subscription ? + graph_cache.graph_topics_ : + graph_cache.graph_services_; GraphNode::TopicMap::iterator cache_topic_it = - graph_topics_.find(topic_info.name_); - if (cache_topic_it == graph_topics_.end()) { + graph_endpoints.find(topic_info.name_); + if (cache_topic_it == graph_endpoints.end()) { // This should not happen. RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", "topic_key %s not found in graph_topics_. Report this.", + "rmw_zenoh_cpp", "topic_key %s not found in graph_endpoints. Report this.", topic_info.name_.c_str()); } else { GraphNode::TopicDataMap::iterator cache_topic_data_it = @@ -281,7 +317,7 @@ void GraphCache::parse_del(const std::string & keyexpr) } // If the topic does not have any TopicData entries, erase the topic from the map. if (cache_topic_it->second.empty()) { - graph_topics_.erase(cache_topic_it); + graph_endpoints.erase(cache_topic_it); } } } @@ -290,9 +326,17 @@ void GraphCache::parse_del(const std::string & keyexpr) // Helper lambda to append pub/subs to the GraphNode. // We capture by reference to update caches like graph_topics_ if update_cache is true. auto remove_topic_data = - [&](const Entity & entity, GraphNode & graph_node) -> void + [&update_graph_topics](const Entity & entity, GraphNode & graph_node, + GraphCache & graph_cache) -> void { - if (entity.type() != EntityType::Publisher && entity.type() != EntityType::Subscription) { + if (entity.type() != EntityType::Publisher && + entity.type() != EntityType::Subscription && + entity.type() != EntityType::Service && + entity.type() != EntityType::Client) + { + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "remove_topic_data() for invalid EntityType. Report this."); return; } @@ -304,13 +348,31 @@ void GraphCache::parse_del(const std::string & keyexpr) "remove_topic_data() called without valid TopicInfo. Report this."); return; } - const liveliness::TopicInfo topic_info = entity.topic_info().value(); - GraphNode::TopicMap & topic_map = entity.type() == - EntityType::Publisher ? graph_node.pubs_ : graph_node.subs_; - const std::string entity_desc = entity.type() == - EntityType::Publisher ? "publisher" : "subscription"; - const std::size_t pub_count = entity.type() == EntityType::Publisher ? 1 : 0; + std::string entity_desc = ""; + GraphNode::TopicMap & topic_map = + [](const Entity & entity, GraphNode & graph_node, + std::string & entity_desc) -> GraphNode::TopicMap & + { + if (entity.type() == EntityType::Publisher) { + entity_desc = "publisher"; + return graph_node.pubs_; + } else if (entity.type() == EntityType::Subscription) { + entity_desc = "subscription"; + return graph_node.subs_; + } else if (entity.type() == EntityType::Service) { + entity_desc = "service"; + return graph_node.services_; + + } else { + entity_desc = "client"; + return graph_node.clients_; + } + }(entity, graph_node, entity_desc); + // For the sake of reusing data structures and lookup functions, we treat publishers and clients are equivalent. + // Similarly, subscriptions and services are equivalent. + const std::size_t pub_count = entity.type() == EntityType::Publisher || + entity.type() == EntityType::Client ? 1 : 0; const std::size_t sub_count = !pub_count; GraphNode::TopicMap::iterator topic_it = topic_map.find(topic_info.name_); @@ -346,7 +408,7 @@ void GraphCache::parse_del(const std::string & keyexpr) } // Bookkeeping: Update graph_topic_ which keeps track of topics across all nodes in the graph. - update_graph_topics(topic_info, pub_count, sub_count); + update_graph_topics(topic_info, entity.type(), pub_count, sub_count, graph_cache); RCUTILS_LOG_INFO_NAMED( "rmw_zenoh_cpp", @@ -401,19 +463,25 @@ void GraphCache::parse_del(const std::string & keyexpr) entity.node_name().c_str() ); auto remove_topics = - [&](const GraphNode::TopicMap & topic_map, const EntityType & entity_type) -> void { - std::size_t pub_count = entity_type == EntityType::Publisher ? 1 : 0; + [&update_graph_topics](const GraphNode::TopicMap & topic_map, + const EntityType & entity_type, GraphCache & graph_cache) -> void { + std::size_t pub_count = entity_type == EntityType::Publisher || + entity_type == EntityType::Client ? 1 : 0; std::size_t sub_count = !pub_count; for (auto topic_it = topic_map.begin(); topic_it != topic_map.end(); ++topic_it) { for (auto type_it = topic_it->second.begin(); type_it != topic_it->second.end(); ++type_it) { - update_graph_topics(type_it->second->info_, pub_count, sub_count); + update_graph_topics( + type_it->second->info_, entity_type, pub_count, sub_count, + graph_cache); } } }; - remove_topics(graph_node->pubs_, EntityType::Publisher); - remove_topics(graph_node->subs_, EntityType::Subscription); + remove_topics(graph_node->pubs_, EntityType::Publisher, *this); + remove_topics(graph_node->subs_, EntityType::Subscription, *this); + remove_topics(graph_node->services_, EntityType::Service, *this); + remove_topics(graph_node->clients_, EntityType::Client, *this); } ns_it->second.erase(node_it); RCUTILS_LOG_WARN_NAMED( @@ -433,7 +501,7 @@ void GraphCache::parse_del(const std::string & keyexpr) } // Update the graph based on the entity. - remove_topic_data(entity, *(node_it->second)); + remove_topic_data(entity, *(node_it->second), *this); } ///============================================================================= @@ -645,6 +713,18 @@ rmw_ret_t GraphCache::get_topic_names_and_types( return fill_names_and_types(graph_topics_, allocator, topic_names_and_types); } +///============================================================================= +rmw_ret_t GraphCache::get_service_names_and_types( + rcutils_allocator_t * allocator, + rmw_names_and_types_t * service_names_and_types) const +{ + RCUTILS_CHECK_ALLOCATOR_WITH_MSG( + allocator, "get_node_names allocator is not valid", return RMW_RET_INVALID_ARGUMENT); + + std::lock_guard lock(graph_mutex_); + return fill_names_and_types(graph_services_, allocator, service_names_and_types); +} + ///============================================================================= rmw_ret_t GraphCache::count_publishers( const char * topic_name, @@ -679,6 +759,40 @@ rmw_ret_t GraphCache::count_subscriptions( return RMW_RET_OK; } +///============================================================================= +rmw_ret_t GraphCache::count_services( + const char * service_name, + size_t * count) const +{ + *count = 0; + std::lock_guard lock(graph_mutex_); + if (graph_services_.count(service_name) != 0) { + for (const std::pair & it : graph_services_.at(service_name)) { + // Iterate through all the types and increment count. + *count += it.second->stats_.sub_count_; + } + } + + return RMW_RET_OK; +} + +///============================================================================= +rmw_ret_t GraphCache::count_clients( + const char * service_name, + size_t * count) const +{ + *count = 0; + std::lock_guard lock(graph_mutex_); + if (graph_services_.count(service_name) != 0) { + for (const std::pair & it : graph_services_.at(service_name)) { + // Iterate through all the types and increment count. + *count += it.second->stats_.pub_count_; + } + } + + return RMW_RET_OK; +} + ///============================================================================= rmw_ret_t GraphCache::get_entity_names_and_types_by_node( liveliness::EntityType entity_type, @@ -737,6 +851,10 @@ rmw_ret_t GraphCache::get_entity_names_and_types_by_node( return fill_names_and_types(node_it->second->pubs_, allocator, names_and_types); } else if (entity_type == EntityType::Subscription) { return fill_names_and_types(node_it->second->subs_, allocator, names_and_types); + } else if (entity_type == EntityType::Service) { + return fill_names_and_types(node_it->second->services_, allocator, names_and_types); + } else if (entity_type == EntityType::Client) { + return fill_names_and_types(node_it->second->clients_, allocator, names_and_types); } else { return RMW_RET_OK; } @@ -787,7 +905,6 @@ rmw_ret_t GraphCache::get_entities_info_by_topic( } } - rmw_ret_t ret = rmw_topic_endpoint_info_array_init_with_size( endpoints_info, nodes.size(), @@ -851,3 +968,22 @@ rmw_ret_t GraphCache::get_entities_info_by_topic( cleanup_endpoints_info.cancel(); return RMW_RET_OK; } + +///============================================================================= +rmw_ret_t GraphCache::service_server_is_available( + const char * service_name, + const char * service_type, + bool * is_available) +{ + *is_available = false; + std::lock_guard lock(graph_mutex_); + GraphNode::TopicMap::iterator service_it = graph_services_.find(service_name); + if (service_it != graph_services_.end()) { + GraphNode::TopicDataMap::iterator type_it = service_it->second.find(service_type); + if (type_it != service_it->second.end()) { + *is_available = true; + } + } + + return RMW_RET_OK; +} diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index ca65c0f9..adeb3e01 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -34,9 +34,14 @@ ///============================================================================= +// TODO(Yadunund): Since we reuse pub_count_ and sub_count_ for pub/sub and +// service/client consider more general names for these fields. struct TopicStats { + // The count of publishers or clients. std::size_t pub_count_; + + // The count of subscriptions or services. std::size_t sub_count_; // Constructor which initializes counters to 0. @@ -69,8 +74,15 @@ struct GraphNode using TopicDataMap = std::unordered_map; // Map topic name to TopicDataMap using TopicMap = std::unordered_map; + + // Entries for pub/sub. TopicMap pubs_ = {}; TopicMap subs_ = {}; + + // Entires for service/client. + TopicMap clients_ = {}; + TopicMap services_ = {}; + }; using GraphNodePtr = std::shared_ptr; @@ -94,6 +106,10 @@ class GraphCache final bool no_demangle, rmw_names_and_types_t * topic_names_and_types) const; + rmw_ret_t get_service_names_and_types( + rcutils_allocator_t * allocator, + rmw_names_and_types_t * service_names_and_types) const; + rmw_ret_t count_publishers( const char * topic_name, size_t * count) const; @@ -102,6 +118,14 @@ class GraphCache final const char * topic_name, size_t * count) const; + rmw_ret_t count_services( + const char * service_name, + size_t * count) const; + + rmw_ret_t count_clients( + const char * service_name, + size_t * count) const; + rmw_ret_t get_entity_names_and_types_by_node( liveliness::EntityType entity_type, rcutils_allocator_t * allocator, @@ -117,6 +141,11 @@ class GraphCache final bool no_demangle, rmw_topic_endpoint_info_array_t * endpoints_info) const; + rmw_ret_t service_server_is_available( + const char * service_name, + const char * service_type, + bool * is_available); + private: /* namespace_1: @@ -146,8 +175,10 @@ class GraphCache final // Map namespace to a map of . NamespaceMap graph_ = {}; - // Optimize topic lookups across the graph. + // Optimize pub/sub lookups across the graph. GraphNode::TopicMap graph_topics_ = {}; + // Optimize service/client lookups across the graph. + GraphNode::TopicMap graph_services_ = {}; mutable std::mutex graph_mutex_; }; diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index b1b1b0f1..7118b64b 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -66,6 +66,7 @@ static const char PUB_STR[] = "MP"; static const char SUB_STR[] = "MS"; static const char SRV_STR[] = "SS"; static const char CLI_STR[] = "SC"; +static const char SLASH_REPLACEMENT = '%'; static const std::unordered_map entity_to_str = { {EntityType::Node, NODE_STR}, @@ -149,12 +150,13 @@ Entity::Entity( token_ss << "/"; } // Finally append node name. - token_ss << node_info_.name_; + token_ss << mangle_name(node_info_.name_); // If this entity has a topic info, append it to the token. if (topic_info_.has_value()) { const auto & topic_info = this->topic_info_.value(); // Note: We don't append a leading "/" as we expect the ROS topic name to start with a "/". - token_ss << topic_info.name_ + "/" + topic_info.type_ + "/" + topic_info.qos_; + token_ss << + "/" + mangle_name(topic_info.name_) + "/" + topic_info.type_ + "/" + topic_info.qos_; } this->keyexpr_ = token_ss.str(); @@ -255,7 +257,7 @@ std::optional Entity::make(const std::string & keyexpr) if (entity_it == str_to_entity.end()) { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", - "Received liveliness token with invalid entity."); + "Received liveliness token with invalid entity %s.", entity_str.c_str()); return std::nullopt; } @@ -263,7 +265,7 @@ std::optional Entity::make(const std::string & keyexpr) std::size_t domain_id = std::stoul(parts[1]); std::string & id = parts[2]; std::string ns = parts[4] == "_" ? "/" : "/" + std::move(parts[4]); - std::string node_name = std::move(parts[5]); + std::string node_name = demangle_name(std::move(parts[5])); std::optional topic_info = std::nullopt; // Populate topic_info if we have a token for an entity other than a node. @@ -275,7 +277,7 @@ std::optional Entity::make(const std::string & keyexpr) return std::nullopt; } topic_info = TopicInfo{ - "/" + std::move(parts[6]), + demangle_name(std::move(parts[6])), std::move(parts[7]), std::move(parts[8])}; } @@ -387,4 +389,32 @@ bool PublishToken::del( return true; } +///============================================================================= +std::string mangle_name(const std::string & input) +{ + std::string output = ""; + for (std::size_t i = 0; i < input.length(); ++i) { + if (input[i] == '/') { + output += SLASH_REPLACEMENT; + } else { + output += input[i]; + } + } + return output; +} + +///============================================================================= +std::string demangle_name(const std::string & input) +{ + std::string output = ""; + for (std::size_t i = 0; i < input.length(); ++i) { + if (input[i] == SLASH_REPLACEMENT) { + output += '/'; + } else { + output += input[i]; + } + } + return output; +} + } // namespace liveliness diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp index 13e09cb4..66f2f11b 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp @@ -133,6 +133,12 @@ class PublishToken const std::string & token); }; +/// Replace "/" instances with "%". +std::string mangle_name(const std::string & input); + +/// Replace "%" instances with "/". +std::string demangle_name(const std::string & input); + } // namespace liveliness #endif // DETAIL__LIVELINESS_UTILS_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 3f76fd91..de45faba 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -146,6 +146,9 @@ struct rmw_service_data_t z_owned_keyexpr_t keyexpr; z_owned_queryable_t qable; + // Liveliness token for the service. + zc_owned_liveliness_token_t token; + const void * request_type_support_impl; const void * response_type_support_impl; const char * typesupport_identifier; @@ -171,9 +174,11 @@ struct rmw_service_data_t struct rmw_client_data_t { z_owned_keyexpr_t keyexpr; - z_owned_closure_reply_t zn_closure_reply; + // Liveliness token for the client. + zc_owned_liveliness_token_t token; + std::mutex message_mutex; std::deque replies; diff --git a/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp index ae8e54c4..9cf19aae 100644 --- a/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp @@ -12,7 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "detail/rmw_data_types.hpp" +#include "rmw/error_handling.h" #include "rmw/get_service_names_and_types.h" extern "C" @@ -25,9 +27,13 @@ rmw_get_service_names_and_types( rcutils_allocator_t * allocator, rmw_names_and_types_t * service_names_and_types) { - static_cast(node); - static_cast(allocator); - static_cast(service_names_and_types); - return RMW_RET_UNSUPPORTED; + RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(service_names_and_types, RMW_RET_INVALID_ARGUMENT); + + return node->context->impl->graph_cache.get_service_names_and_types( + allocator, service_names_and_types); } } // extern "C" diff --git a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp index a4c80b4e..c5218e1e 100644 --- a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp @@ -14,13 +14,9 @@ #include "detail/rmw_data_types.hpp" -#include "rcutils/strdup.h" - #include "rmw/error_handling.h" #include "rmw/get_topic_names_and_types.h" -#include "rcpputils/scope_exit.hpp" - extern "C" { ///============================================================================== diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 1e6da94c..1b612d88 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -255,7 +255,12 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) "Sending Query '%s' to fetch discovery data...", liveliness_str.c_str() ); - z_owned_reply_channel_t channel = zc_reply_fifo_new(16); + // We create a blocking channel that is unbounded, ie. `bound` = 0, to receive + // replies for the zc_liveliness_get() call. This is necessary as if the `bound` + // is too low, the channel may starve the zenoh executor of its threads which + // would lead to deadlocks when trying to receive replies and block the + // execution here. + z_owned_reply_channel_t channel = zc_reply_fifo_new(0); zc_liveliness_get( z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), z_move(channel.send), NULL); diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 38a6c662..5653fae6 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1851,6 +1851,51 @@ rmw_create_client( return nullptr; } + // Note: Service request/response types will contain a suffix Request_ or Response_. + // We remove the suffix when appending the type to the liveliness tokens for + // better reusability within GraphCache. + std::string service_type = client_data->request_type_support->get_name(); + size_t suffix_substring_position = service_type.find("Request_"); + if (std::string::npos != suffix_substring_position) { + service_type = service_type.substr(0, suffix_substring_position); + } else { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unexpected type %s for client %s. Report this bug", + service_type.c_str(), rmw_client->service_name); + return nullptr; + } + const auto liveliness_entity = liveliness::Entity::make( + z_info_zid(z_loan(node->context->impl->session)), + liveliness::EntityType::Client, + liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, + liveliness::TopicInfo{rmw_client->service_name, + std::move(service_type), "reliable"} + ); + if (!liveliness_entity.has_value()) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to generate keyexpr for liveliness token for the client."); + return nullptr; + } + client_data->token = zc_liveliness_declare_token( + z_loan(node->context->impl->session), + z_keyexpr(liveliness_entity->keyexpr().c_str()), + NULL + ); + auto free_token = rcpputils::make_scope_exit( + [client_data]() { + if (client_data != nullptr) { + z_drop(z_move(client_data->token)); + } + }); + if (!z_check(client_data->token)) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create liveliness token for the client."); + return nullptr; + } + rmw_client->data = client_data; free_rmw_client.cancel(); @@ -1901,6 +1946,7 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) z_reply_drop(&reply); } client_data->replies.clear(); + z_drop(z_move(client_data->token)); allocator->deallocate(client_data->request_type_support, allocator->state); allocator->deallocate(client_data->response_type_support, allocator->state); @@ -2018,7 +2064,10 @@ rmw_send_request( opts.attachment = z_bytes_map_as_attachment(&map); - opts.target = Z_QUERY_TARGET_ALL; + opts.target = Z_QUERY_TARGET_ALL_COMPLETE; + // Latest consolidation guarantees unicity of replies for the same key expression. It optimizes bandwidth. + // Default is None which imples replies may come in any order and any number. + opts.consolidation = z_query_consolidation_latest(); opts.value.payload = z_bytes_t{data_length, reinterpret_cast(request_bytes)}; opts.value.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL); client_data->zn_closure_reply = z_closure(client_data_handler, nullptr, client_data); @@ -2372,12 +2421,14 @@ rmw_create_service( } z_owned_closure_query_t callback = z_closure(service_data_handler, nullptr, service_data); - + // Configure the queryable to process complete queries. + z_queryable_options_t qable_options = z_queryable_options_default(); + qable_options.complete = true; service_data->qable = z_declare_queryable( z_loan(context_impl->session), z_loan(service_data->keyexpr), z_move(callback), - nullptr); + &qable_options); if (!z_check(service_data->qable)) { RMW_SET_ERROR_MSG("unable to create zenoh queryable"); @@ -2388,7 +2439,50 @@ rmw_create_service( z_undeclare_queryable(z_move(service_data->qable)); }); - // TODO(francocipollone): Update graph cache. + // Note: Service request/response types will contain a suffix Request_ or Response_. + // We remove the suffix when appending the type to the liveliness tokens for + // better reusability within GraphCache. + std::string service_type = service_data->response_type_support->get_name(); + size_t suffix_substring_position = service_type.find("Response_"); + if (std::string::npos != suffix_substring_position) { + service_type = service_type.substr(0, suffix_substring_position); + } else { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unexpected type %s for service %s. Report this bug", + service_type.c_str(), rmw_service->service_name); + return nullptr; + } + const auto liveliness_entity = liveliness::Entity::make( + z_info_zid(z_loan(node->context->impl->session)), + liveliness::EntityType::Service, + liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, + liveliness::TopicInfo{rmw_service->service_name, + std::move(service_type), "reliable"} + ); + if (!liveliness_entity.has_value()) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to generate keyexpr for liveliness token for the service."); + return nullptr; + } + service_data->token = zc_liveliness_declare_token( + z_loan(node->context->impl->session), + z_keyexpr(liveliness_entity->keyexpr().c_str()), + NULL + ); + auto free_token = rcpputils::make_scope_exit( + [service_data]() { + if (service_data != nullptr) { + z_drop(z_move(service_data->token)); + } + }); + if (!z_check(service_data->token)) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create liveliness token for the service."); + return nullptr; + } rmw_service->data = service_data; @@ -2402,6 +2496,8 @@ rmw_create_service( free_response_type_support.cancel(); free_ros_keyexpr.cancel(); undeclare_z_queryable.cancel(); + free_token.cancel(); + return rmw_service; } @@ -2439,8 +2535,7 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) for (z_owned_query_t & query : service_data->query_queue) { z_drop(z_move(query)); } - service_data->query_queue.clear(); - service_data->sequence_to_query_map.clear(); + z_drop(z_move(service_data->token)); allocator->deallocate(service_data->request_type_support, allocator->state); allocator->deallocate(service_data->response_type_support, allocator->state); @@ -2449,7 +2544,6 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) allocator->deallocate(const_cast(service->service_name), allocator->state); allocator->deallocate(service, allocator->state); - // TODO(francocipollone): Update graph cache. return RMW_RET_OK; } @@ -3117,10 +3211,26 @@ rmw_count_clients( const char * service_name, size_t * count) { - static_cast(node); - static_cast(service_name); - static_cast(count); - return RMW_RET_UNSUPPORTED; + RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + node, + node->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(service_name, RMW_RET_INVALID_ARGUMENT); + int validation_result = RMW_TOPIC_VALID; + rmw_ret_t ret = rmw_validate_full_topic_name(service_name, &validation_result, nullptr); + if (RMW_RET_OK != ret) { + return ret; + } + if (RMW_TOPIC_VALID != validation_result) { + const char * reason = rmw_full_topic_name_validation_result_string(validation_result); + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("topic_name argument is invalid: %s", reason); + return RMW_RET_INVALID_ARGUMENT; + } + RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); + + return node->context->impl->graph_cache.count_clients(service_name, count); } //============================================================================== @@ -3131,10 +3241,26 @@ rmw_count_services( const char * service_name, size_t * count) { - static_cast(node); - static_cast(service_name); - static_cast(count); - return RMW_RET_UNSUPPORTED; + RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + node, + node->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(service_name, RMW_RET_INVALID_ARGUMENT); + int validation_result = RMW_TOPIC_VALID; + rmw_ret_t ret = rmw_validate_full_topic_name(service_name, &validation_result, nullptr); + if (RMW_RET_OK != ret) { + return ret; + } + if (RMW_TOPIC_VALID != validation_result) { + const char * reason = rmw_full_topic_name_validation_result_string(validation_result); + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("topic_name argument is invalid: %s", reason); + return RMW_RET_INVALID_ARGUMENT; + } + RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); + + return node->context->impl->graph_cache.count_services(service_name, count); } //============================================================================== @@ -3177,12 +3303,37 @@ rmw_service_server_is_available( const rmw_client_t * client, bool * is_available) { - // TODO(francocipollone): Provide a proper implementation. - // We need graph cache information for this. - *is_available = true; - static_cast(node); - static_cast(client); - return RMW_RET_OK; + RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + node, + node->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(is_available, RMW_RET_INVALID_ARGUMENT); + + rmw_client_data_t * client_data = static_cast(client->data); + if (client_data == nullptr) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "Unable to retreive client_data from client for service %s", client->service_name); + return RMW_RET_INVALID_ARGUMENT; + } + + std::string service_type = client_data->request_type_support->get_name(); + size_t suffix_substring_position = service_type.find("Request_"); + if (std::string::npos != suffix_substring_position) { + service_type = service_type.substr(0, suffix_substring_position); + } else { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unexpected type %s for client %s. Report this bug", + service_type.c_str(), client->service_name); + return RMW_RET_INVALID_ARGUMENT; + } + + return node->context->impl->graph_cache.service_server_is_available( + client->service_name, service_type.c_str(), is_available); } //==============================================================================