From bb8a9a65251ea1d2633297e25a7c7d079ca3ccc6 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 28 Dec 2023 14:22:15 +0800 Subject: [PATCH 01/15] Put and del liveliness tokens for service and clients Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 7 +- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 75 +++++++++++++++++++-- 2 files changed, 77 insertions(+), 5 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 3f76fd91..de45faba 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -146,6 +146,9 @@ struct rmw_service_data_t z_owned_keyexpr_t keyexpr; z_owned_queryable_t qable; + // Liveliness token for the service. + zc_owned_liveliness_token_t token; + const void * request_type_support_impl; const void * response_type_support_impl; const char * typesupport_identifier; @@ -171,9 +174,11 @@ struct rmw_service_data_t struct rmw_client_data_t { z_owned_keyexpr_t keyexpr; - z_owned_closure_reply_t zn_closure_reply; + // Liveliness token for the client. + zc_owned_liveliness_token_t token; + std::mutex message_mutex; std::deque replies; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 38a6c662..8fed6d54 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1851,6 +1851,40 @@ rmw_create_client( return nullptr; } + // Note: The typename in the liveliness token is the that of the request. + // When updating the graph cache, the _Response suffix will be removed such + // that the types of clients and services will match. + const auto liveliness_entity = liveliness::Entity::make( + z_info_zid(z_loan(node->context->impl->session)), + liveliness::EntityType::Client, + liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, + liveliness::TopicInfo{rmw_client->service_name, + client_data->request_type_support->get_name(), "reliable"} + ); + if (!liveliness_entity.has_value()) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to generate keyexpr for liveliness token for the client."); + return nullptr; + } + client_data->token = zc_liveliness_declare_token( + z_loan(node->context->impl->session), + z_keyexpr(liveliness_entity->keyexpr().c_str()), + NULL + ); + auto free_token = rcpputils::make_scope_exit( + [client_data]() { + if (client_data != nullptr) { + z_drop(z_move(client_data->token)); + } + }); + if (!z_check(client_data->token)) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create liveliness token for the client."); + return nullptr; + } + rmw_client->data = client_data; free_rmw_client.cancel(); @@ -1901,6 +1935,7 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) z_reply_drop(&reply); } client_data->replies.clear(); + z_drop(z_move(client_data->token)); allocator->deallocate(client_data->request_type_support, allocator->state); allocator->deallocate(client_data->response_type_support, allocator->state); @@ -2388,7 +2423,39 @@ rmw_create_service( z_undeclare_queryable(z_move(service_data->qable)); }); - // TODO(francocipollone): Update graph cache. + // Note: The typename in the liveliness token is the that of the request. + // When updating the graph cache, the _Response suffix will be removed such + // that the types of clients and services will match. + const auto liveliness_entity = liveliness::Entity::make( + z_info_zid(z_loan(node->context->impl->session)), + liveliness::EntityType::Service, + liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, + liveliness::TopicInfo{rmw_service->service_name, + service_data->response_type_support->get_name(), "reliable"} + ); + if (!liveliness_entity.has_value()) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to generate keyexpr for liveliness token for the service."); + return nullptr; + } + service_data->token = zc_liveliness_declare_token( + z_loan(node->context->impl->session), + z_keyexpr(liveliness_entity->keyexpr().c_str()), + NULL + ); + auto free_token = rcpputils::make_scope_exit( + [service_data]() { + if (service_data != nullptr) { + z_drop(z_move(service_data->token)); + } + }); + if (!z_check(service_data->token)) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create liveliness token for the service."); + return nullptr; + } rmw_service->data = service_data; @@ -2402,6 +2469,8 @@ rmw_create_service( free_response_type_support.cancel(); free_ros_keyexpr.cancel(); undeclare_z_queryable.cancel(); + free_token.cancel(); + return rmw_service; } @@ -2439,8 +2508,7 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) for (z_owned_query_t & query : service_data->query_queue) { z_drop(z_move(query)); } - service_data->query_queue.clear(); - service_data->sequence_to_query_map.clear(); + z_drop(z_move(service_data->token)); allocator->deallocate(service_data->request_type_support, allocator->state); allocator->deallocate(service_data->response_type_support, allocator->state); @@ -2449,7 +2517,6 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) allocator->deallocate(const_cast(service->service_name), allocator->state); allocator->deallocate(service, allocator->state); - // TODO(francocipollone): Update graph cache. return RMW_RET_OK; } From 677c05e93048058a3bd6d55755bea6f90c768148 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 28 Dec 2023 14:50:44 +0800 Subject: [PATCH 02/15] Remove suffix from service type in liveliness tokens Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 40 ++++++++++++++++++++++++++------- 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 8fed6d54..78046077 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1851,15 +1851,27 @@ rmw_create_client( return nullptr; } - // Note: The typename in the liveliness token is the that of the request. - // When updating the graph cache, the _Response suffix will be removed such - // that the types of clients and services will match. + // Note: Service request/response types will contain a suffix Request_ or Response_. + // We remove the suffix when appending the type to the liveliness tokens for + // better reusability within GraphCache. + std::string service_type = client_data->request_type_support->get_name(); + size_t suffix_substring_position = service_type.find("Request_"); + if (std::string::npos != suffix_substring_position) { + service_type = service_type.substr(0, suffix_substring_position); + } + else { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unexpected type %s for client %s. Report this bug", + service_type.c_str(), rmw_client->service_name); + return nullptr; + } const auto liveliness_entity = liveliness::Entity::make( z_info_zid(z_loan(node->context->impl->session)), liveliness::EntityType::Client, liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, liveliness::TopicInfo{rmw_client->service_name, - client_data->request_type_support->get_name(), "reliable"} + std::move(service_type), "reliable"} ); if (!liveliness_entity.has_value()) { RCUTILS_LOG_ERROR_NAMED( @@ -2423,15 +2435,27 @@ rmw_create_service( z_undeclare_queryable(z_move(service_data->qable)); }); - // Note: The typename in the liveliness token is the that of the request. - // When updating the graph cache, the _Response suffix will be removed such - // that the types of clients and services will match. + // Note: Service request/response types will contain a suffix Request_ or Response_. + // We remove the suffix when appending the type to the liveliness tokens for + // better reusability within GraphCache. + std::string service_type = service_data->response_type_support->get_name(); + size_t suffix_substring_position = service_type.find("Response_"); + if (std::string::npos != suffix_substring_position) { + service_type = service_type.substr(0, suffix_substring_position); + } + else { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unexpected type %s for service %s. Report this bug", + service_type.c_str(), rmw_service->service_name); + return nullptr; + } const auto liveliness_entity = liveliness::Entity::make( z_info_zid(z_loan(node->context->impl->session)), liveliness::EntityType::Service, liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, liveliness::TopicInfo{rmw_service->service_name, - service_data->response_type_support->get_name(), "reliable"} + std::move(service_type), "reliable"} ); if (!liveliness_entity.has_value()) { RCUTILS_LOG_ERROR_NAMED( From af1b58290497e042305a958dc82d65e3cbe49b54 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 28 Dec 2023 16:15:30 +0800 Subject: [PATCH 03/15] Update GraphCache with service and client information Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 113 ++++++++++++++---- rmw_zenoh_cpp/src/detail/graph_cache.hpp | 16 ++- rmw_zenoh_cpp/src/detail/liveliness_utils.cpp | 2 +- 3 files changed, 104 insertions(+), 27 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 35b18c4f..d68845a7 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -70,7 +70,13 @@ void GraphCache::parse_put(const std::string & keyexpr) auto add_topic_data = [this](const Entity & entity, GraphNode & graph_node) -> void { - if (entity.type() != EntityType::Publisher && entity.type() != EntityType::Subscription) { + if (entity.type() != EntityType::Publisher && + entity.type() != EntityType::Subscription && + entity.type() != EntityType::Service && + entity.type() != EntityType::Client) { + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "add_topic_data() for invalid EntityType. Report this."); return; } @@ -84,12 +90,33 @@ void GraphCache::parse_put(const std::string & keyexpr) } const liveliness::TopicInfo topic_info = entity.topic_info().value(); - GraphNode::TopicMap & topic_map = entity.type() == - EntityType::Publisher ? graph_node.pubs_ : graph_node.subs_; - const std::string entity_desc = entity.type() == - EntityType::Publisher ? "publisher" : "subscription"; - const std::size_t pub_count = entity.type() == EntityType::Publisher ? 1 : 0; + std::string entity_desc = ""; + GraphNode::TopicMap & topic_map = + [&]() -> GraphNode::TopicMap& + { + if (entity.type() == EntityType::Publisher) { + entity_desc = "publisher"; + return graph_node.pubs_; + } + else if (entity.type() == EntityType::Subscription) { + entity_desc = "subscription"; + return graph_node.subs_; + } + else if (entity.type() == EntityType::Service) { + entity_desc = "service"; + return graph_node.services_; + + } + else { + entity_desc = "client"; + return graph_node.clients_; + } + }(); + // For the sake of reusing data structures and lookup functions, we treat publishers and clients are equivalent. + // Similarly, subscriptions and services are equivalent. + const std::size_t pub_count = entity.type() == EntityType::Publisher || entity.type() == EntityType::Client ? 1 : 0; const std::size_t sub_count = !pub_count; + TopicDataPtr graph_topic_data = std::make_shared( topic_info, TopicStats{pub_count, sub_count}); @@ -115,14 +142,18 @@ void GraphCache::parse_put(const std::string & keyexpr) } // Bookkeeping: Update graph_topics_ which keeps track of topics across all nodes in the graph - GraphNode::TopicMap::iterator cache_topic_it = graph_topics_.find(topic_info.name_); - if (cache_topic_it == graph_topics_.end()) { + GraphNode::TopicMap & graph_endpoints = + entity.type() == EntityType::Publisher || entity.type() == EntityType::Subscription ? + graph_topics_ : + graph_services_; + GraphNode::TopicMap::iterator cache_topic_it = graph_endpoints.find(topic_info.name_); + if (cache_topic_it == graph_endpoints.end()) { // First time this topic name is added to the graph. std::shared_ptr topic_data_ptr = std::make_shared( topic_info, TopicStats{pub_count, sub_count} ); - graph_topics_[topic_info.name_] = GraphNode::TopicDataMap{ + graph_endpoints[topic_info.name_] = GraphNode::TopicDataMap{ {topic_info.type_, topic_data_ptr} }; } else { @@ -153,6 +184,7 @@ void GraphCache::parse_put(const std::string & keyexpr) }; // Helper lambda to convert an Entity into a GraphNode. + // Note: this will update bookkeeping variables in GraphCache. auto make_graph_node = [&](const Entity & entity) -> std::shared_ptr { @@ -166,7 +198,7 @@ void GraphCache::parse_put(const std::string & keyexpr) // Token was for a node. return graph_node; } - // Add pub/sub entries. + // Add endpoint entries. add_topic_data(entity, *graph_node); return graph_node; @@ -257,15 +289,19 @@ void GraphCache::parse_del(const std::string & keyexpr) // Helper lambda to update graph_topics_. auto update_graph_topics = - [&](const liveliness::TopicInfo topic_info, std::size_t pub_count, + [&](const liveliness::TopicInfo topic_info, const EntityType entity_type, std::size_t pub_count, std::size_t sub_count) -> void { + GraphNode::TopicMap & graph_endpoints = + entity_type == EntityType::Publisher || entity_type == EntityType::Subscription ? + graph_topics_ : + graph_services_; GraphNode::TopicMap::iterator cache_topic_it = - graph_topics_.find(topic_info.name_); - if (cache_topic_it == graph_topics_.end()) { + graph_endpoints.find(topic_info.name_); + if (cache_topic_it == graph_endpoints.end()) { // This should not happen. RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", "topic_key %s not found in graph_topics_. Report this.", + "rmw_zenoh_cpp", "topic_key %s not found in graph_endpoints. Report this.", topic_info.name_.c_str()); } else { GraphNode::TopicDataMap::iterator cache_topic_data_it = @@ -281,7 +317,7 @@ void GraphCache::parse_del(const std::string & keyexpr) } // If the topic does not have any TopicData entries, erase the topic from the map. if (cache_topic_it->second.empty()) { - graph_topics_.erase(cache_topic_it); + graph_endpoints.erase(cache_topic_it); } } } @@ -292,7 +328,13 @@ void GraphCache::parse_del(const std::string & keyexpr) auto remove_topic_data = [&](const Entity & entity, GraphNode & graph_node) -> void { - if (entity.type() != EntityType::Publisher && entity.type() != EntityType::Subscription) { + if (entity.type() != EntityType::Publisher && + entity.type() != EntityType::Subscription && + entity.type() != EntityType::Service && + entity.type() != EntityType::Client) { + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "remove_topic_data() for invalid EntityType. Report this."); return; } @@ -304,13 +346,32 @@ void GraphCache::parse_del(const std::string & keyexpr) "remove_topic_data() called without valid TopicInfo. Report this."); return; } - const liveliness::TopicInfo topic_info = entity.topic_info().value(); - GraphNode::TopicMap & topic_map = entity.type() == - EntityType::Publisher ? graph_node.pubs_ : graph_node.subs_; - const std::string entity_desc = entity.type() == - EntityType::Publisher ? "publisher" : "subscription"; - const std::size_t pub_count = entity.type() == EntityType::Publisher ? 1 : 0; + std::string entity_desc = ""; + GraphNode::TopicMap & topic_map = + [&]() -> GraphNode::TopicMap& + { + if (entity.type() == EntityType::Publisher) { + entity_desc = "publisher"; + return graph_node.pubs_; + } + else if (entity.type() == EntityType::Subscription) { + entity_desc = "subscription"; + return graph_node.subs_; + } + else if (entity.type() == EntityType::Service) { + entity_desc = "service"; + return graph_node.services_; + + } + else { + entity_desc = "client"; + return graph_node.clients_; + } + }(); + // For the sake of reusing data structures and lookup functions, we treat publishers and clients are equivalent. + // Similarly, subscriptions and services are equivalent. + const std::size_t pub_count = entity.type() == EntityType::Publisher || entity.type() == EntityType::Client ? 1 : 0; const std::size_t sub_count = !pub_count; GraphNode::TopicMap::iterator topic_it = topic_map.find(topic_info.name_); @@ -346,7 +407,7 @@ void GraphCache::parse_del(const std::string & keyexpr) } // Bookkeeping: Update graph_topic_ which keeps track of topics across all nodes in the graph. - update_graph_topics(topic_info, pub_count, sub_count); + update_graph_topics(topic_info, entity.type(), pub_count, sub_count); RCUTILS_LOG_INFO_NAMED( "rmw_zenoh_cpp", @@ -402,18 +463,20 @@ void GraphCache::parse_del(const std::string & keyexpr) ); auto remove_topics = [&](const GraphNode::TopicMap & topic_map, const EntityType & entity_type) -> void { - std::size_t pub_count = entity_type == EntityType::Publisher ? 1 : 0; + std::size_t pub_count = entity_type == EntityType::Publisher || entity_type == EntityType::Client ? 1 : 0; std::size_t sub_count = !pub_count; for (auto topic_it = topic_map.begin(); topic_it != topic_map.end(); ++topic_it) { for (auto type_it = topic_it->second.begin(); type_it != topic_it->second.end(); ++type_it) { - update_graph_topics(type_it->second->info_, pub_count, sub_count); + update_graph_topics(type_it->second->info_, entity_type, pub_count, sub_count); } } }; remove_topics(graph_node->pubs_, EntityType::Publisher); remove_topics(graph_node->subs_, EntityType::Subscription); + remove_topics(graph_node->services_, EntityType::Service); + remove_topics(graph_node->clients_, EntityType::Client); } ns_it->second.erase(node_it); RCUTILS_LOG_WARN_NAMED( diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index ca65c0f9..485447fc 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -34,9 +34,14 @@ ///============================================================================= +// TODO(Yadunund): Since we reuse pub_count_ and sub_count_ for pub/sub and +// service/client consider more general names for these fields. struct TopicStats { + // The count of publishers or clients. std::size_t pub_count_; + + // The count of subscriptions or services. std::size_t sub_count_; // Constructor which initializes counters to 0. @@ -69,8 +74,15 @@ struct GraphNode using TopicDataMap = std::unordered_map; // Map topic name to TopicDataMap using TopicMap = std::unordered_map; + + // Entries for pub/sub. TopicMap pubs_ = {}; TopicMap subs_ = {}; + + // Entires for service/client. + TopicMap clients_ = {}; + TopicMap services_ = {}; + }; using GraphNodePtr = std::shared_ptr; @@ -146,8 +158,10 @@ class GraphCache final // Map namespace to a map of . NamespaceMap graph_ = {}; - // Optimize topic lookups across the graph. + // Optimize pub/sub lookups across the graph. GraphNode::TopicMap graph_topics_ = {}; + // Optimize service/client lookups across the graph. + GraphNode::TopicMap graph_services_ = {}; mutable std::mutex graph_mutex_; }; diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index b1b1b0f1..fac73fe1 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -255,7 +255,7 @@ std::optional Entity::make(const std::string & keyexpr) if (entity_it == str_to_entity.end()) { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", - "Received liveliness token with invalid entity."); + "Received liveliness token with invalid entity %s.", entity_str.c_str()); return std::nullopt; } From 0beabd7ed3fe4e2656e582fb8fddf07a58e42956 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 28 Dec 2023 16:24:57 +0800 Subject: [PATCH 04/15] Update get_entity_names_and_types Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index d68845a7..2e781ae6 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -800,6 +800,10 @@ rmw_ret_t GraphCache::get_entity_names_and_types_by_node( return fill_names_and_types(node_it->second->pubs_, allocator, names_and_types); } else if (entity_type == EntityType::Subscription) { return fill_names_and_types(node_it->second->subs_, allocator, names_and_types); + } else if (entity_type == EntityType::Service) { + return fill_names_and_types(node_it->second->services_, allocator, names_and_types); + } else if (entity_type == EntityType::Client) { + return fill_names_and_types(node_it->second->clients_, allocator, names_and_types); } else { return RMW_RET_OK; } @@ -850,7 +854,6 @@ rmw_ret_t GraphCache::get_entities_info_by_topic( } } - rmw_ret_t ret = rmw_topic_endpoint_info_array_init_with_size( endpoints_info, nodes.size(), From e359d5c8a38c3d1ddaecdca30a37c8b6524ffaa7 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 28 Dec 2023 20:56:52 +0800 Subject: [PATCH 05/15] Mangle node and topic names Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/liveliness_utils.cpp | 37 +++++++++++++++++-- rmw_zenoh_cpp/src/detail/liveliness_utils.hpp | 6 +++ 2 files changed, 39 insertions(+), 4 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index fac73fe1..a6435953 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -66,6 +66,7 @@ static const char PUB_STR[] = "MP"; static const char SUB_STR[] = "MS"; static const char SRV_STR[] = "SS"; static const char CLI_STR[] = "SC"; +static const char SLASH_REPLACEMENT = '%'; static const std::unordered_map entity_to_str = { {EntityType::Node, NODE_STR}, @@ -149,12 +150,12 @@ Entity::Entity( token_ss << "/"; } // Finally append node name. - token_ss << node_info_.name_; + token_ss << mangle_name(node_info_.name_); // If this entity has a topic info, append it to the token. if (topic_info_.has_value()) { const auto & topic_info = this->topic_info_.value(); // Note: We don't append a leading "/" as we expect the ROS topic name to start with a "/". - token_ss << topic_info.name_ + "/" + topic_info.type_ + "/" + topic_info.qos_; + token_ss << "/" + mangle_name(topic_info.name_) + "/" + topic_info.type_ + "/" + topic_info.qos_; } this->keyexpr_ = token_ss.str(); @@ -263,7 +264,7 @@ std::optional Entity::make(const std::string & keyexpr) std::size_t domain_id = std::stoul(parts[1]); std::string & id = parts[2]; std::string ns = parts[4] == "_" ? "/" : "/" + std::move(parts[4]); - std::string node_name = std::move(parts[5]); + std::string node_name = demangle_name(std::move(parts[5])); std::optional topic_info = std::nullopt; // Populate topic_info if we have a token for an entity other than a node. @@ -275,7 +276,7 @@ std::optional Entity::make(const std::string & keyexpr) return std::nullopt; } topic_info = TopicInfo{ - "/" + std::move(parts[6]), + demangle_name(std::move(parts[6])), std::move(parts[7]), std::move(parts[8])}; } @@ -387,4 +388,32 @@ bool PublishToken::del( return true; } +///============================================================================= +std::string mangle_name(const std::string& input) { + std::string output = ""; + for (std::size_t i = 0; i < input.length(); ++i) { + if (input[i] == '/') { + output += SLASH_REPLACEMENT; + } + else { + output += input[i]; + } + } + return output; +} + +///============================================================================= +std::string demangle_name(const std::string& input) { + std::string output = ""; + for (std::size_t i = 0; i < input.length(); ++i) { + if (input[i] == SLASH_REPLACEMENT) { + output += '/'; + } + else { + output += input[i]; + } + } + return output; +} + } // namespace liveliness diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp index 13e09cb4..f3e2c05a 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp @@ -133,6 +133,12 @@ class PublishToken const std::string & token); }; +/// Replace "/" instances with "§". +std::string mangle_name(const std::string& input); + +/// Replace "§" instances with "/". +std::string demangle_name(const std::string& input); + } // namespace liveliness #endif // DETAIL__LIVELINESS_UTILS_HPP_ From 9cca466b3c77403d1d43ea22736c499cc10363e7 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 28 Dec 2023 21:13:01 +0800 Subject: [PATCH 06/15] Implement get_service_names_and_types Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 12 ++++++++++++ rmw_zenoh_cpp/src/detail/graph_cache.hpp | 4 ++++ .../src/rmw_get_service_names_and_types.cpp | 14 ++++++++++---- .../src/rmw_get_topic_names_and_types.cpp | 4 ---- 4 files changed, 26 insertions(+), 8 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 2e781ae6..ed2dec3d 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -708,6 +708,18 @@ rmw_ret_t GraphCache::get_topic_names_and_types( return fill_names_and_types(graph_topics_, allocator, topic_names_and_types); } +///============================================================================= +rmw_ret_t GraphCache::get_service_names_and_types( + rcutils_allocator_t * allocator, + rmw_names_and_types_t * service_names_and_types) const +{ + RCUTILS_CHECK_ALLOCATOR_WITH_MSG( + allocator, "get_node_names allocator is not valid", return RMW_RET_INVALID_ARGUMENT); + + std::lock_guard lock(graph_mutex_); + return fill_names_and_types(graph_services_, allocator, service_names_and_types); +} + ///============================================================================= rmw_ret_t GraphCache::count_publishers( const char * topic_name, diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index 485447fc..b5c41717 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -106,6 +106,10 @@ class GraphCache final bool no_demangle, rmw_names_and_types_t * topic_names_and_types) const; + rmw_ret_t get_service_names_and_types( + rcutils_allocator_t * allocator, + rmw_names_and_types_t * service_names_and_types) const; + rmw_ret_t count_publishers( const char * topic_name, size_t * count) const; diff --git a/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp index ae8e54c4..9cf19aae 100644 --- a/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp @@ -12,7 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "detail/rmw_data_types.hpp" +#include "rmw/error_handling.h" #include "rmw/get_service_names_and_types.h" extern "C" @@ -25,9 +27,13 @@ rmw_get_service_names_and_types( rcutils_allocator_t * allocator, rmw_names_and_types_t * service_names_and_types) { - static_cast(node); - static_cast(allocator); - static_cast(service_names_and_types); - return RMW_RET_UNSUPPORTED; + RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(service_names_and_types, RMW_RET_INVALID_ARGUMENT); + + return node->context->impl->graph_cache.get_service_names_and_types( + allocator, service_names_and_types); } } // extern "C" diff --git a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp index a4c80b4e..c5218e1e 100644 --- a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp @@ -14,13 +14,9 @@ #include "detail/rmw_data_types.hpp" -#include "rcutils/strdup.h" - #include "rmw/error_handling.h" #include "rmw/get_topic_names_and_types.h" -#include "rcpputils/scope_exit.hpp" - extern "C" { ///============================================================================== From 2175a2795aca20137f0b6f834284b5d2484e11b7 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 28 Dec 2023 21:30:48 +0800 Subject: [PATCH 07/15] Implement count methods for services and clients Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 34 +++++++++++++++++ rmw_zenoh_cpp/src/detail/graph_cache.hpp | 8 ++++ rmw_zenoh_cpp/src/rmw_zenoh.cpp | 48 ++++++++++++++++++++---- 3 files changed, 82 insertions(+), 8 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index ed2dec3d..007848ab 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -754,6 +754,40 @@ rmw_ret_t GraphCache::count_subscriptions( return RMW_RET_OK; } +///============================================================================= +rmw_ret_t GraphCache::count_services( + const char * service_name, + size_t * count) const +{ + *count = 0; + std::lock_guard lock(graph_mutex_); + if (graph_services_.count(service_name) != 0) { + for (const std::pair & it : graph_services_.at(service_name)) { + // Iterate through all the types and increment count. + *count += it.second->stats_.sub_count_; + } + } + + return RMW_RET_OK; +} + +///============================================================================= +rmw_ret_t GraphCache::count_clients( + const char * service_name, + size_t * count) const +{ + *count = 0; + std::lock_guard lock(graph_mutex_); + if (graph_services_.count(service_name) != 0) { + for (const std::pair & it : graph_services_.at(service_name)) { + // Iterate through all the types and increment count. + *count += it.second->stats_.pub_count_; + } + } + + return RMW_RET_OK; +} + ///============================================================================= rmw_ret_t GraphCache::get_entity_names_and_types_by_node( liveliness::EntityType entity_type, diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index b5c41717..56c41235 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -118,6 +118,14 @@ class GraphCache final const char * topic_name, size_t * count) const; + rmw_ret_t count_services( + const char * service_name, + size_t * count) const; + + rmw_ret_t count_clients( + const char * service_name, + size_t * count) const; + rmw_ret_t get_entity_names_and_types_by_node( liveliness::EntityType entity_type, rcutils_allocator_t * allocator, diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 78046077..b105729d 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -3208,10 +3208,26 @@ rmw_count_clients( const char * service_name, size_t * count) { - static_cast(node); - static_cast(service_name); - static_cast(count); - return RMW_RET_UNSUPPORTED; + RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + node, + node->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(service_name, RMW_RET_INVALID_ARGUMENT); + int validation_result = RMW_TOPIC_VALID; + rmw_ret_t ret = rmw_validate_full_topic_name(service_name, &validation_result, nullptr); + if (RMW_RET_OK != ret) { + return ret; + } + if (RMW_TOPIC_VALID != validation_result) { + const char * reason = rmw_full_topic_name_validation_result_string(validation_result); + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("topic_name argument is invalid: %s", reason); + return RMW_RET_INVALID_ARGUMENT; + } + RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); + + return node->context->impl->graph_cache.count_clients(service_name, count); } //============================================================================== @@ -3222,10 +3238,26 @@ rmw_count_services( const char * service_name, size_t * count) { - static_cast(node); - static_cast(service_name); - static_cast(count); - return RMW_RET_UNSUPPORTED; + RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + node, + node->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(service_name, RMW_RET_INVALID_ARGUMENT); + int validation_result = RMW_TOPIC_VALID; + rmw_ret_t ret = rmw_validate_full_topic_name(service_name, &validation_result, nullptr); + if (RMW_RET_OK != ret) { + return ret; + } + if (RMW_TOPIC_VALID != validation_result) { + const char * reason = rmw_full_topic_name_validation_result_string(validation_result); + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("topic_name argument is invalid: %s", reason); + return RMW_RET_INVALID_ARGUMENT; + } + RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); + + return node->context->impl->graph_cache.count_services(service_name, count); } //============================================================================== From 5f936c36219bcffdb57be6696bafdc57b8e4a986 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 28 Dec 2023 21:50:20 +0800 Subject: [PATCH 08/15] Implement check for service availability Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 20 +++++++++++++ rmw_zenoh_cpp/src/detail/graph_cache.hpp | 5 ++++ rmw_zenoh_cpp/src/rmw_zenoh.cpp | 37 ++++++++++++++++++++---- 3 files changed, 56 insertions(+), 6 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 007848ab..6a2f9bc9 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -963,3 +963,23 @@ rmw_ret_t GraphCache::get_entities_info_by_topic( cleanup_endpoints_info.cancel(); return RMW_RET_OK; } + + +///============================================================================= +rmw_ret_t GraphCache::service_server_is_available( + const char * service_name, + const char * service_type, + bool * is_available) +{ + *is_available = false; + std::lock_guard lock(graph_mutex_); + GraphNode::TopicMap::iterator service_it = graph_services_.find(service_name); + if (service_it != graph_services_.end()){ + GraphNode::TopicDataMap::iterator type_it = service_it->second.find(service_type); + if (type_it != service_it->second.end()){ + *is_available = true; + } + } + + return RMW_RET_OK; +} diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index 56c41235..adeb3e01 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -141,6 +141,11 @@ class GraphCache final bool no_demangle, rmw_topic_endpoint_info_array_t * endpoints_info) const; + rmw_ret_t service_server_is_available( + const char * service_name, + const char * service_type, + bool * is_available); + private: /* namespace_1: diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index b105729d..26bdc9ba 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -3300,12 +3300,37 @@ rmw_service_server_is_available( const rmw_client_t * client, bool * is_available) { - // TODO(francocipollone): Provide a proper implementation. - // We need graph cache information for this. - *is_available = true; - static_cast(node); - static_cast(client); - return RMW_RET_OK; + RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + node, + node->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(is_available, RMW_RET_INVALID_ARGUMENT); + + rmw_client_data_t * client_data = static_cast(client->data); + if (client_data == nullptr) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("Unable to retreive client_data from client for service %s", client->service_name); + return RMW_RET_INVALID_ARGUMENT; + } + + std::string service_type = client_data->request_type_support->get_name(); + size_t suffix_substring_position = service_type.find("Request_"); + if (std::string::npos != suffix_substring_position) { + service_type = service_type.substr(0, suffix_substring_position); + } + else { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unexpected type %s for client %s. Report this bug", + service_type.c_str(), client->service_name); + return RMW_RET_INVALID_ARGUMENT; + } + + return node->context->impl->graph_cache.service_server_is_available( + client->service_name, service_type.c_str(), is_available); } //============================================================================== From 801cf2f720e6f96ffbbdbf3b8eb1349f1e8b0641 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 28 Dec 2023 21:50:48 +0800 Subject: [PATCH 09/15] style Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 41 +++++++++---------- rmw_zenoh_cpp/src/detail/liveliness_utils.cpp | 15 +++---- rmw_zenoh_cpp/src/detail/liveliness_utils.hpp | 4 +- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 18 ++++---- 4 files changed, 38 insertions(+), 40 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 6a2f9bc9..3c552a90 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -73,7 +73,8 @@ void GraphCache::parse_put(const std::string & keyexpr) if (entity.type() != EntityType::Publisher && entity.type() != EntityType::Subscription && entity.type() != EntityType::Service && - entity.type() != EntityType::Client) { + entity.type() != EntityType::Client) + { RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", "add_topic_data() for invalid EntityType. Report this."); @@ -92,29 +93,27 @@ void GraphCache::parse_put(const std::string & keyexpr) const liveliness::TopicInfo topic_info = entity.topic_info().value(); std::string entity_desc = ""; GraphNode::TopicMap & topic_map = - [&]() -> GraphNode::TopicMap& + [&]() -> GraphNode::TopicMap & { if (entity.type() == EntityType::Publisher) { entity_desc = "publisher"; return graph_node.pubs_; - } - else if (entity.type() == EntityType::Subscription) { + } else if (entity.type() == EntityType::Subscription) { entity_desc = "subscription"; return graph_node.subs_; - } - else if (entity.type() == EntityType::Service) { + } else if (entity.type() == EntityType::Service) { entity_desc = "service"; return graph_node.services_; - } - else { + } else { entity_desc = "client"; return graph_node.clients_; } }(); // For the sake of reusing data structures and lookup functions, we treat publishers and clients are equivalent. // Similarly, subscriptions and services are equivalent. - const std::size_t pub_count = entity.type() == EntityType::Publisher || entity.type() == EntityType::Client ? 1 : 0; + const std::size_t pub_count = entity.type() == EntityType::Publisher || + entity.type() == EntityType::Client ? 1 : 0; const std::size_t sub_count = !pub_count; TopicDataPtr graph_topic_data = std::make_shared( @@ -331,7 +330,8 @@ void GraphCache::parse_del(const std::string & keyexpr) if (entity.type() != EntityType::Publisher && entity.type() != EntityType::Subscription && entity.type() != EntityType::Service && - entity.type() != EntityType::Client) { + entity.type() != EntityType::Client) + { RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", "remove_topic_data() for invalid EntityType. Report this."); @@ -349,29 +349,27 @@ void GraphCache::parse_del(const std::string & keyexpr) const liveliness::TopicInfo topic_info = entity.topic_info().value(); std::string entity_desc = ""; GraphNode::TopicMap & topic_map = - [&]() -> GraphNode::TopicMap& + [&]() -> GraphNode::TopicMap & { if (entity.type() == EntityType::Publisher) { entity_desc = "publisher"; return graph_node.pubs_; - } - else if (entity.type() == EntityType::Subscription) { + } else if (entity.type() == EntityType::Subscription) { entity_desc = "subscription"; return graph_node.subs_; - } - else if (entity.type() == EntityType::Service) { + } else if (entity.type() == EntityType::Service) { entity_desc = "service"; return graph_node.services_; - } - else { + } else { entity_desc = "client"; return graph_node.clients_; } }(); // For the sake of reusing data structures and lookup functions, we treat publishers and clients are equivalent. // Similarly, subscriptions and services are equivalent. - const std::size_t pub_count = entity.type() == EntityType::Publisher || entity.type() == EntityType::Client ? 1 : 0; + const std::size_t pub_count = entity.type() == EntityType::Publisher || + entity.type() == EntityType::Client ? 1 : 0; const std::size_t sub_count = !pub_count; GraphNode::TopicMap::iterator topic_it = topic_map.find(topic_info.name_); @@ -463,7 +461,8 @@ void GraphCache::parse_del(const std::string & keyexpr) ); auto remove_topics = [&](const GraphNode::TopicMap & topic_map, const EntityType & entity_type) -> void { - std::size_t pub_count = entity_type == EntityType::Publisher || entity_type == EntityType::Client ? 1 : 0; + std::size_t pub_count = entity_type == EntityType::Publisher || + entity_type == EntityType::Client ? 1 : 0; std::size_t sub_count = !pub_count; for (auto topic_it = topic_map.begin(); topic_it != topic_map.end(); ++topic_it) { for (auto type_it = topic_it->second.begin(); type_it != topic_it->second.end(); @@ -974,9 +973,9 @@ rmw_ret_t GraphCache::service_server_is_available( *is_available = false; std::lock_guard lock(graph_mutex_); GraphNode::TopicMap::iterator service_it = graph_services_.find(service_name); - if (service_it != graph_services_.end()){ + if (service_it != graph_services_.end()) { GraphNode::TopicDataMap::iterator type_it = service_it->second.find(service_type); - if (type_it != service_it->second.end()){ + if (type_it != service_it->second.end()) { *is_available = true; } } diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index a6435953..7118b64b 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -155,7 +155,8 @@ Entity::Entity( if (topic_info_.has_value()) { const auto & topic_info = this->topic_info_.value(); // Note: We don't append a leading "/" as we expect the ROS topic name to start with a "/". - token_ss << "/" + mangle_name(topic_info.name_) + "/" + topic_info.type_ + "/" + topic_info.qos_; + token_ss << + "/" + mangle_name(topic_info.name_) + "/" + topic_info.type_ + "/" + topic_info.qos_; } this->keyexpr_ = token_ss.str(); @@ -389,13 +390,13 @@ bool PublishToken::del( } ///============================================================================= -std::string mangle_name(const std::string& input) { +std::string mangle_name(const std::string & input) +{ std::string output = ""; for (std::size_t i = 0; i < input.length(); ++i) { if (input[i] == '/') { output += SLASH_REPLACEMENT; - } - else { + } else { output += input[i]; } } @@ -403,13 +404,13 @@ std::string mangle_name(const std::string& input) { } ///============================================================================= -std::string demangle_name(const std::string& input) { +std::string demangle_name(const std::string & input) +{ std::string output = ""; for (std::size_t i = 0; i < input.length(); ++i) { if (input[i] == SLASH_REPLACEMENT) { output += '/'; - } - else { + } else { output += input[i]; } } diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp index f3e2c05a..c8bd12b4 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp @@ -134,10 +134,10 @@ class PublishToken }; /// Replace "/" instances with "§". -std::string mangle_name(const std::string& input); +std::string mangle_name(const std::string & input); /// Replace "§" instances with "/". -std::string demangle_name(const std::string& input); +std::string demangle_name(const std::string & input); } // namespace liveliness diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 26bdc9ba..579cbb55 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1858,13 +1858,12 @@ rmw_create_client( size_t suffix_substring_position = service_type.find("Request_"); if (std::string::npos != suffix_substring_position) { service_type = service_type.substr(0, suffix_substring_position); - } - else { + } else { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unexpected type %s for client %s. Report this bug", service_type.c_str(), rmw_client->service_name); - return nullptr; + return nullptr; } const auto liveliness_entity = liveliness::Entity::make( z_info_zid(z_loan(node->context->impl->session)), @@ -2442,13 +2441,12 @@ rmw_create_service( size_t suffix_substring_position = service_type.find("Response_"); if (std::string::npos != suffix_substring_position) { service_type = service_type.substr(0, suffix_substring_position); - } - else { + } else { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unexpected type %s for service %s. Report this bug", service_type.c_str(), rmw_service->service_name); - return nullptr; + return nullptr; } const auto liveliness_entity = liveliness::Entity::make( z_info_zid(z_loan(node->context->impl->session)), @@ -3312,7 +3310,8 @@ rmw_service_server_is_available( rmw_client_data_t * client_data = static_cast(client->data); if (client_data == nullptr) { - RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("Unable to retreive client_data from client for service %s", client->service_name); + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "Unable to retreive client_data from client for service %s", client->service_name); return RMW_RET_INVALID_ARGUMENT; } @@ -3320,13 +3319,12 @@ rmw_service_server_is_available( size_t suffix_substring_position = service_type.find("Request_"); if (std::string::npos != suffix_substring_position) { service_type = service_type.substr(0, suffix_substring_position); - } - else { + } else { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unexpected type %s for client %s. Report this bug", service_type.c_str(), client->service_name); - return RMW_RET_INVALID_ARGUMENT; + return RMW_RET_INVALID_ARGUMENT; } return node->context->impl->graph_cache.service_server_is_available( From 641e542a6301f1bb1d76093166f33395092492af Mon Sep 17 00:00:00 2001 From: Yadunund Date: Fri, 5 Jan 2024 00:56:47 +0800 Subject: [PATCH 10/15] Set channel bound to 0 for zc_liveliness_get Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/rmw_init.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 1e6da94c..201118c6 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -255,7 +255,11 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) "Sending Query '%s' to fetch discovery data...", liveliness_str.c_str() ); - z_owned_reply_channel_t channel = zc_reply_fifo_new(16); + // Without setting the bound value to 0, the liveliness get call + // block execution where there are more than 3 nodes in the graph. + // From the zenoh-c documentation: If `bound` is different from 0, that channel will be bound and apply back-pressure when full. + // TODO(Yadunund): Investigate why this is the case and try switching to callbacks instead. + z_owned_reply_channel_t channel = zc_reply_fifo_new(0); zc_liveliness_get( z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), z_move(channel.send), NULL); From 26fca70fc2ebe6124cba20c99c5ad9f85782fed9 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Fri, 5 Jan 2024 14:00:09 +0800 Subject: [PATCH 11/15] Cleanup Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 1 - rmw_zenoh_cpp/src/rmw_init.cpp | 9 +++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 3c552a90..80cd089a 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -963,7 +963,6 @@ rmw_ret_t GraphCache::get_entities_info_by_topic( return RMW_RET_OK; } - ///============================================================================= rmw_ret_t GraphCache::service_server_is_available( const char * service_name, diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 201118c6..1b612d88 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -255,10 +255,11 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) "Sending Query '%s' to fetch discovery data...", liveliness_str.c_str() ); - // Without setting the bound value to 0, the liveliness get call - // block execution where there are more than 3 nodes in the graph. - // From the zenoh-c documentation: If `bound` is different from 0, that channel will be bound and apply back-pressure when full. - // TODO(Yadunund): Investigate why this is the case and try switching to callbacks instead. + // We create a blocking channel that is unbounded, ie. `bound` = 0, to receive + // replies for the zc_liveliness_get() call. This is necessary as if the `bound` + // is too low, the channel may starve the zenoh executor of its threads which + // would lead to deadlocks when trying to receive replies and block the + // execution here. z_owned_reply_channel_t channel = zc_reply_fifo_new(0); zc_liveliness_get( z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), From efb9605c2b84198a4830cc6ea06e3b129e30991a Mon Sep 17 00:00:00 2001 From: Yadunund Date: Mon, 15 Jan 2024 13:51:23 +0800 Subject: [PATCH 12/15] Set consolidation and other options for queryables Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 579cbb55..5653fae6 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -2064,7 +2064,10 @@ rmw_send_request( opts.attachment = z_bytes_map_as_attachment(&map); - opts.target = Z_QUERY_TARGET_ALL; + opts.target = Z_QUERY_TARGET_ALL_COMPLETE; + // Latest consolidation guarantees unicity of replies for the same key expression. It optimizes bandwidth. + // Default is None which imples replies may come in any order and any number. + opts.consolidation = z_query_consolidation_latest(); opts.value.payload = z_bytes_t{data_length, reinterpret_cast(request_bytes)}; opts.value.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL); client_data->zn_closure_reply = z_closure(client_data_handler, nullptr, client_data); @@ -2418,12 +2421,14 @@ rmw_create_service( } z_owned_closure_query_t callback = z_closure(service_data_handler, nullptr, service_data); - + // Configure the queryable to process complete queries. + z_queryable_options_t qable_options = z_queryable_options_default(); + qable_options.complete = true; service_data->qable = z_declare_queryable( z_loan(context_impl->session), z_loan(service_data->keyexpr), z_move(callback), - nullptr); + &qable_options); if (!z_check(service_data->qable)) { RMW_SET_ERROR_MSG("unable to create zenoh queryable"); From 4d01e779cfc481b3d0d39ef39bfbafa3530cfb90 Mon Sep 17 00:00:00 2001 From: Yadu Date: Wed, 17 Jan 2024 10:18:44 +0800 Subject: [PATCH 13/15] Update rmw_zenoh_cpp/src/detail/liveliness_utils.hpp Co-authored-by: Chris Lalancette Signed-off-by: Yadu --- rmw_zenoh_cpp/src/detail/liveliness_utils.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp index c8bd12b4..0d4202c6 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp @@ -136,7 +136,7 @@ class PublishToken /// Replace "/" instances with "§". std::string mangle_name(const std::string & input); -/// Replace "§" instances with "/". +/// Replace "%" instances with "/". std::string demangle_name(const std::string & input); } // namespace liveliness From e29234a159d09a474e6f06d11a72e1f4c2f4535b Mon Sep 17 00:00:00 2001 From: Yadu Date: Wed, 17 Jan 2024 10:18:52 +0800 Subject: [PATCH 14/15] Update rmw_zenoh_cpp/src/detail/liveliness_utils.hpp Co-authored-by: Chris Lalancette Signed-off-by: Yadu --- rmw_zenoh_cpp/src/detail/liveliness_utils.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp index 0d4202c6..66f2f11b 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp @@ -133,7 +133,7 @@ class PublishToken const std::string & token); }; -/// Replace "/" instances with "§". +/// Replace "/" instances with "%". std::string mangle_name(const std::string & input); /// Replace "%" instances with "/". From aae76cdfd1a9d5501302116846a6f5db0634359b Mon Sep 17 00:00:00 2001 From: Yadunund Date: Wed, 17 Jan 2024 16:10:58 +0800 Subject: [PATCH 15/15] Explicitly capture by reference in lambdas Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 56 +++++++++++++----------- 1 file changed, 31 insertions(+), 25 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 80cd089a..b34e68d4 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -68,7 +68,7 @@ void GraphCache::parse_put(const std::string & keyexpr) // Helper lambda to append pub/subs to the GraphNode. // We capture by reference to update graph_topics_ auto add_topic_data = - [this](const Entity & entity, GraphNode & graph_node) -> void + [](const Entity & entity, GraphNode & graph_node, GraphCache & graph_cache) -> void { if (entity.type() != EntityType::Publisher && entity.type() != EntityType::Subscription && @@ -93,7 +93,8 @@ void GraphCache::parse_put(const std::string & keyexpr) const liveliness::TopicInfo topic_info = entity.topic_info().value(); std::string entity_desc = ""; GraphNode::TopicMap & topic_map = - [&]() -> GraphNode::TopicMap & + [](const Entity & entity, GraphNode & graph_node, + std::string & entity_desc) -> GraphNode::TopicMap & { if (entity.type() == EntityType::Publisher) { entity_desc = "publisher"; @@ -109,7 +110,7 @@ void GraphCache::parse_put(const std::string & keyexpr) entity_desc = "client"; return graph_node.clients_; } - }(); + }(entity, graph_node, entity_desc); // For the sake of reusing data structures and lookup functions, we treat publishers and clients are equivalent. // Similarly, subscriptions and services are equivalent. const std::size_t pub_count = entity.type() == EntityType::Publisher || @@ -143,8 +144,8 @@ void GraphCache::parse_put(const std::string & keyexpr) // Bookkeeping: Update graph_topics_ which keeps track of topics across all nodes in the graph GraphNode::TopicMap & graph_endpoints = entity.type() == EntityType::Publisher || entity.type() == EntityType::Subscription ? - graph_topics_ : - graph_services_; + graph_cache.graph_topics_ : + graph_cache.graph_services_; GraphNode::TopicMap::iterator cache_topic_it = graph_endpoints.find(topic_info.name_); if (cache_topic_it == graph_endpoints.end()) { // First time this topic name is added to the graph. @@ -185,7 +186,7 @@ void GraphCache::parse_put(const std::string & keyexpr) // Helper lambda to convert an Entity into a GraphNode. // Note: this will update bookkeeping variables in GraphCache. auto make_graph_node = - [&](const Entity & entity) -> std::shared_ptr + [&add_topic_data](const Entity & entity, GraphCache & graph_cache) -> std::shared_ptr { auto graph_node = std::make_shared(); graph_node->id_ = entity.id(); @@ -198,7 +199,7 @@ void GraphCache::parse_put(const std::string & keyexpr) return graph_node; } // Add endpoint entries. - add_topic_data(entity, *graph_node); + add_topic_data(entity, *graph_node, graph_cache); return graph_node; }; @@ -210,7 +211,7 @@ void GraphCache::parse_put(const std::string & keyexpr) NamespaceMap::iterator ns_it = graph_.find(entity.node_namespace()); if (ns_it == graph_.end()) { NodeMap node_map = { - {entity.node_name(), make_graph_node(entity)}}; + {entity.node_name(), make_graph_node(entity, *this)}}; graph_.emplace(std::make_pair(entity.node_namespace(), std::move(node_map))); RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", "Added node /%s to a new namespace %s in the graph.", @@ -236,7 +237,7 @@ void GraphCache::parse_put(const std::string & keyexpr) // Either the first time a node with this name is added or with an existing // name but unique id. NodeMap::iterator insertion_it = - ns_it->second.insert(std::make_pair(entity.node_name(), make_graph_node(entity))); + ns_it->second.insert(std::make_pair(entity.node_name(), make_graph_node(entity, *this))); if (insertion_it != ns_it->second.end()) { RCUTILS_LOG_INFO_NAMED( "rmw_zenoh_cpp", @@ -273,7 +274,7 @@ void GraphCache::parse_put(const std::string & keyexpr) } // Update the graph based on the entity. - add_topic_data(entity, *(node_it->second)); + add_topic_data(entity, *(node_it->second), *this); } ///============================================================================= @@ -288,13 +289,13 @@ void GraphCache::parse_del(const std::string & keyexpr) // Helper lambda to update graph_topics_. auto update_graph_topics = - [&](const liveliness::TopicInfo topic_info, const EntityType entity_type, std::size_t pub_count, - std::size_t sub_count) -> void + [](const liveliness::TopicInfo topic_info, const EntityType entity_type, std::size_t pub_count, + std::size_t sub_count, GraphCache & graph_cache) -> void { GraphNode::TopicMap & graph_endpoints = entity_type == EntityType::Publisher || entity_type == EntityType::Subscription ? - graph_topics_ : - graph_services_; + graph_cache.graph_topics_ : + graph_cache.graph_services_; GraphNode::TopicMap::iterator cache_topic_it = graph_endpoints.find(topic_info.name_); if (cache_topic_it == graph_endpoints.end()) { @@ -325,7 +326,8 @@ void GraphCache::parse_del(const std::string & keyexpr) // Helper lambda to append pub/subs to the GraphNode. // We capture by reference to update caches like graph_topics_ if update_cache is true. auto remove_topic_data = - [&](const Entity & entity, GraphNode & graph_node) -> void + [&update_graph_topics](const Entity & entity, GraphNode & graph_node, + GraphCache & graph_cache) -> void { if (entity.type() != EntityType::Publisher && entity.type() != EntityType::Subscription && @@ -349,7 +351,8 @@ void GraphCache::parse_del(const std::string & keyexpr) const liveliness::TopicInfo topic_info = entity.topic_info().value(); std::string entity_desc = ""; GraphNode::TopicMap & topic_map = - [&]() -> GraphNode::TopicMap & + [](const Entity & entity, GraphNode & graph_node, + std::string & entity_desc) -> GraphNode::TopicMap & { if (entity.type() == EntityType::Publisher) { entity_desc = "publisher"; @@ -365,7 +368,7 @@ void GraphCache::parse_del(const std::string & keyexpr) entity_desc = "client"; return graph_node.clients_; } - }(); + }(entity, graph_node, entity_desc); // For the sake of reusing data structures and lookup functions, we treat publishers and clients are equivalent. // Similarly, subscriptions and services are equivalent. const std::size_t pub_count = entity.type() == EntityType::Publisher || @@ -405,7 +408,7 @@ void GraphCache::parse_del(const std::string & keyexpr) } // Bookkeeping: Update graph_topic_ which keeps track of topics across all nodes in the graph. - update_graph_topics(topic_info, entity.type(), pub_count, sub_count); + update_graph_topics(topic_info, entity.type(), pub_count, sub_count, graph_cache); RCUTILS_LOG_INFO_NAMED( "rmw_zenoh_cpp", @@ -460,7 +463,8 @@ void GraphCache::parse_del(const std::string & keyexpr) entity.node_name().c_str() ); auto remove_topics = - [&](const GraphNode::TopicMap & topic_map, const EntityType & entity_type) -> void { + [&update_graph_topics](const GraphNode::TopicMap & topic_map, + const EntityType & entity_type, GraphCache & graph_cache) -> void { std::size_t pub_count = entity_type == EntityType::Publisher || entity_type == EntityType::Client ? 1 : 0; std::size_t sub_count = !pub_count; @@ -468,14 +472,16 @@ void GraphCache::parse_del(const std::string & keyexpr) for (auto type_it = topic_it->second.begin(); type_it != topic_it->second.end(); ++type_it) { - update_graph_topics(type_it->second->info_, entity_type, pub_count, sub_count); + update_graph_topics( + type_it->second->info_, entity_type, pub_count, sub_count, + graph_cache); } } }; - remove_topics(graph_node->pubs_, EntityType::Publisher); - remove_topics(graph_node->subs_, EntityType::Subscription); - remove_topics(graph_node->services_, EntityType::Service); - remove_topics(graph_node->clients_, EntityType::Client); + remove_topics(graph_node->pubs_, EntityType::Publisher, *this); + remove_topics(graph_node->subs_, EntityType::Subscription, *this); + remove_topics(graph_node->services_, EntityType::Service, *this); + remove_topics(graph_node->clients_, EntityType::Client, *this); } ns_it->second.erase(node_it); RCUTILS_LOG_WARN_NAMED( @@ -495,7 +501,7 @@ void GraphCache::parse_del(const std::string & keyexpr) } // Update the graph based on the entity. - remove_topic_data(entity, *(node_it->second)); + remove_topic_data(entity, *(node_it->second), *this); } ///=============================================================================