From 0e7a380342a7751ba7e1e63aa369c1abaa707804 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Fri, 19 Jan 2024 00:33:23 -0500 Subject: [PATCH] Improve service and subscription reliability (#92) * Switch to unique_ptr for the query_queue. This is not strictly required, but it does make ownership clearer. Signed-off-by: Chris Lalancette * Fix cpplint issues. Signed-off-by: Chris Lalancette * Use unique_ptr for replies. Signed-off-by: Chris Lalancette * Use custom classes to wrap the queries and replies. This allows us to do RAII and drop things in all paths properly. Signed-off-by: Chris Lalancette * Revamp rmw_shutdown and rmw_context_fini. rmw_context_fini is called during the atexit handler, so we can't do complex things like shutdown the session. Instead, switch to shutting down the session during rmw_shutdown. Signed-off-by: Chris Lalancette * Make sure to check z_reply_is_ok. Signed-off-by: Chris Lalancette * Add in header includes as needed. Signed-off-by: Chris Lalancette * Remove a lot of debugging statements. Signed-off-by: Chris Lalancette * Add in source timestamps to service requests. Signed-off-by: Chris Lalancette * Add in source_timestamp for services. Signed-off-by: Chris Lalancette * Run the constructor for rmw_publisher_data_t. Signed-off-by: Chris Lalancette * Don't pass unnecessary argument into __rmw_take. Signed-off-by: Chris Lalancette * Fix up memory leaks in clients, services, and the context. Signed-off-by: Chris Lalancette * Fill in the received_timetstamp. Signed-off-by: Chris Lalancette * Add in a writer_guid to the client and service requests. Signed-off-by: Chris Lalancette * Revamp rmw_wait. There are more comments in the code, but in particular this change makes sure we deal with data that is already ready, rather than always waiting. With this in place, services seem to work as promised. Signed-off-by: Chris Lalancette * Switch which end of the deque subscriptions push and pop from. Since it is a deque, it doesn't *really* matter whether we push from the back and pull from the front, or push from the front and pull from the back. Switch to pushing to the back and pulling from the front so we match what the clients and services are doing. Signed-off-by: Chris Lalancette * Remove unnecessary configuration file. Signed-off-by: Chris Lalancette --------- Signed-off-by: Chris Lalancette --- rmw_zenoh_config.json5 | 18 - rmw_zenoh_cpp/src/detail/graph_cache.cpp | 44 +- rmw_zenoh_cpp/src/detail/graph_cache.hpp | 2 - rmw_zenoh_cpp/src/detail/liveliness_utils.cpp | 4 +- rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 58 +- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 41 +- rmw_zenoh_cpp/src/detail/zenoh_config.cpp | 7 - .../src/detail/zenoh_router_check.cpp | 10 +- rmw_zenoh_cpp/src/rmw_event.cpp | 1 + .../src/rmw_get_network_flow_endpoints.cpp | 3 + .../src/rmw_get_node_info_and_types.cpp | 3 + .../src/rmw_get_service_names_and_types.cpp | 3 + .../src/rmw_get_topic_endpoint_info.cpp | 3 + .../src/rmw_get_topic_names_and_types.cpp | 4 +- rmw_zenoh_cpp/src/rmw_init.cpp | 64 +-- rmw_zenoh_cpp/src/rmw_init_options.cpp | 2 +- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 527 ++++++++++++------ 17 files changed, 485 insertions(+), 309 deletions(-) delete mode 100644 rmw_zenoh_config.json5 diff --git a/rmw_zenoh_config.json5 b/rmw_zenoh_config.json5 deleted file mode 100644 index 9b8d8887..00000000 --- a/rmw_zenoh_config.json5 +++ /dev/null @@ -1,18 +0,0 @@ -{ - /// The node's mode (router, peer or client) - mode: "peer", - - /// The default timeout to apply to queries in milliseconds. - queries_default_timeout: 10000, - - /// Configure internal transport parameters - transport: { - qos: { - enabled: true, - }, - // Shared memory configuration - shared_memory: { - enabled: false, - }, - }, -} diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index b34e68d4..3282ade0 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -111,8 +111,8 @@ void GraphCache::parse_put(const std::string & keyexpr) return graph_node.clients_; } }(entity, graph_node, entity_desc); - // For the sake of reusing data structures and lookup functions, we treat publishers and clients are equivalent. - // Similarly, subscriptions and services are equivalent. + // For the sake of reusing data structures and lookup functions, we treat publishers and + // clients are equivalent. Similarly, subscriptions and services are equivalent. const std::size_t pub_count = entity.type() == EntityType::Publisher || entity.type() == EntityType::Client ? 1 : 0; const std::size_t sub_count = !pub_count; @@ -172,15 +172,6 @@ void GraphCache::parse_put(const std::string & keyexpr) topic_data_insertion.first->second->stats_.sub_count_ += sub_count; } } - - RCUTILS_LOG_INFO_NAMED( - "rmw_zenoh_cpp", - "Added %s on topic %s with type %s and qos %s to node /%s.", - entity_desc.c_str(), - topic_info.name_.c_str(), - topic_info.type_.c_str(), - topic_info.qos_.c_str(), - graph_node.name_.c_str()); }; // Helper lambda to convert an Entity into a GraphNode. @@ -213,10 +204,6 @@ void GraphCache::parse_put(const std::string & keyexpr) NodeMap node_map = { {entity.node_name(), make_graph_node(entity, *this)}}; graph_.emplace(std::make_pair(entity.node_namespace(), std::move(node_map))); - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Added node /%s to a new namespace %s in the graph.", - entity.node_name().c_str(), - entity.node_namespace().c_str()); return; } @@ -238,14 +225,7 @@ void GraphCache::parse_put(const std::string & keyexpr) // name but unique id. NodeMap::iterator insertion_it = ns_it->second.insert(std::make_pair(entity.node_name(), make_graph_node(entity, *this))); - if (insertion_it != ns_it->second.end()) { - RCUTILS_LOG_INFO_NAMED( - "rmw_zenoh_cpp", - "Added a new node /%s with id %s to an existing namespace %s in the graph.", - entity.node_name().c_str(), - entity.id().c_str(), - entity.node_namespace().c_str()); - } else { + if (insertion_it == ns_it->second.end()) { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to add a new node /%s with id %s an " @@ -369,8 +349,8 @@ void GraphCache::parse_del(const std::string & keyexpr) return graph_node.clients_; } }(entity, graph_node, entity_desc); - // For the sake of reusing data structures and lookup functions, we treat publishers and clients are equivalent. - // Similarly, subscriptions and services are equivalent. + // For the sake of reusing data structures and lookup functions, we treat publishers and + // clients are equivalent. Similarly, subscriptions and services are equivalent. const std::size_t pub_count = entity.type() == EntityType::Publisher || entity.type() == EntityType::Client ? 1 : 0; const std::size_t sub_count = !pub_count; @@ -409,15 +389,6 @@ void GraphCache::parse_del(const std::string & keyexpr) // Bookkeeping: Update graph_topic_ which keeps track of topics across all nodes in the graph. update_graph_topics(topic_info, entity.type(), pub_count, sub_count, graph_cache); - - RCUTILS_LOG_INFO_NAMED( - "rmw_zenoh_cpp", - "Removed %s on topic %s with type %s and qos %s to node /%s.", - entity_desc.c_str(), - topic_info.name_.c_str(), - topic_info.type_.c_str(), - topic_info.qos_.c_str(), - graph_node.name_.c_str()); }; // Lock the graph mutex before accessing the graph. @@ -484,11 +455,6 @@ void GraphCache::parse_del(const std::string & keyexpr) remove_topics(graph_node->clients_, EntityType::Client, *this); } ns_it->second.erase(node_it); - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "Removed node /%s from the graph.", - entity.node_name().c_str() - ); return; } diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index adeb3e01..85e08a15 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -61,7 +61,6 @@ struct TopicData using TopicDataPtr = std::shared_ptr; ///============================================================================= -// TODO(Yadunund): Expand to services and clients. struct GraphNode { std::string id_; @@ -82,7 +81,6 @@ struct GraphNode // Entires for service/client. TopicMap clients_ = {}; TopicMap services_ = {}; - }; using GraphNodePtr = std::shared_ptr; diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index 7118b64b..00718196 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -348,7 +348,7 @@ bool PublishToken::put( RCUTILS_SET_ERROR_MSG("invalid keyexpression generation for liveliness publication."); return false; } - RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "Sending PUT on %s", token.c_str()); + z_put_options_t options = z_put_options_default(); options.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL); if (z_put(z_loan(*session), z_keyexpr(token.c_str()), nullptr, 0, &options) < 0) { @@ -379,7 +379,7 @@ bool PublishToken::del( RCUTILS_SET_ERROR_MSG("invalid key-expression generation for liveliness publication."); return false; } - RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "Sending DELETE on %s", token.c_str()); + const z_delete_options_t options = z_delete_options_default(); if (z_delete(z_loan(*session), z_loan(keyexpr), &options) < 0) { RCUTILS_SET_ERROR_MSG("failed to delete liveliness key"); diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index d26c9e0d..6e80a97c 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -14,7 +14,9 @@ #include +#include #include +#include #include #include "rcpputils/scope_exit.hpp" @@ -63,12 +65,12 @@ void sub_data_handler( sub_data->queue_depth, z_loan(keystr)); - std::unique_ptr old = std::move(sub_data->message_queue.back()); + std::unique_ptr old = std::move(sub_data->message_queue.front()); z_drop(&old->payload); - sub_data->message_queue.pop_back(); + sub_data->message_queue.pop_front(); } - sub_data->message_queue.emplace_front( + sub_data->message_queue.emplace_back( std::make_unique( zc_sample_payload_rcinc(sample), sample->timestamp.time, sample->timestamp.id.id)); @@ -81,13 +83,24 @@ void sub_data_handler( } } +ZenohQuery::ZenohQuery(const z_query_t * query) +{ + query_ = z_query_clone(query); +} + +ZenohQuery::~ZenohQuery() +{ + z_drop(z_move(query_)); +} + +const z_query_t ZenohQuery::get_query() const +{ + return z_query_loan(&query_); +} + //============================================================================== void service_data_handler(const z_query_t * query, void * data) { - RCUTILS_LOG_INFO_NAMED( - "rmw_zenoh_cpp", - "[service_data_handler] triggered" - ); z_owned_str_t keystr = z_keyexpr_to_string(z_query_keyexpr(query)); auto drop_keystr = rcpputils::make_scope_exit( [&keystr]() { @@ -108,7 +121,7 @@ void service_data_handler(const z_query_t * query, void * data) // Get the query parameters and payload { std::lock_guard lock(service_data->query_queue_mutex); - service_data->query_queue.push_back(z_query_clone(query)); + service_data->query_queue.emplace_back(std::make_unique(query)); } { // Since we added new data, trigger the guard condition if it is available @@ -119,6 +132,31 @@ void service_data_handler(const z_query_t * query, void * data) } } +ZenohReply::ZenohReply(const z_owned_reply_t * reply) +{ + reply_ = *reply; +} + +ZenohReply::~ZenohReply() +{ + z_reply_drop(z_move(reply_)); +} + +std::optional ZenohReply::get_sample() const +{ + if (z_reply_is_ok(&reply_)) { + return z_reply_ok(&reply_); + } + + return std::nullopt; +} + +size_t rmw_client_data_t::get_next_sequence_number() +{ + std::lock_guard lock(sequence_number_mutex); + return sequence_number++; +} + //============================================================================== void client_data_handler(z_owned_reply_t * reply, void * data) { @@ -145,9 +183,9 @@ void client_data_handler(z_owned_reply_t * reply, void * data) return; } { - std::lock_guard msg_lock(client_data->message_mutex); + std::lock_guard msg_lock(client_data->replies_mutex); // Take ownership of the reply. - client_data->replies.emplace_back(*reply); + client_data->replies.emplace_back(std::make_unique(reply)); *reply = z_reply_null(); } { diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index de45faba..aa18cf06 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -139,10 +140,21 @@ void client_data_handler(z_owned_reply_t * reply, void * client_data); ///============================================================================== -struct rmw_service_data_t +class ZenohQuery final { - std::size_t get_new_uid(); +public: + ZenohQuery(const z_query_t * query); + + ~ZenohQuery(); + const z_query_t get_query() const; + +private: + z_owned_query_t query_; +}; + +struct rmw_service_data_t +{ z_owned_keyexpr_t keyexpr; z_owned_queryable_t qable; @@ -158,11 +170,11 @@ struct rmw_service_data_t rmw_context_t * context; // Deque to store the queries in the order they arrive. - std::deque query_queue; + std::deque> query_queue; std::mutex query_queue_mutex; // Map to store the sequence_number -> query_id - std::map sequence_to_query_map; + std::unordered_map> sequence_to_query_map; std::mutex sequence_to_query_map_mutex; std::mutex internal_mutex; @@ -171,6 +183,19 @@ struct rmw_service_data_t ///============================================================================== +class ZenohReply final +{ +public: + ZenohReply(const z_owned_reply_t * reply); + + ~ZenohReply(); + + std::optional get_sample() const; + +private: + z_owned_reply_t reply_; +}; + struct rmw_client_data_t { z_owned_keyexpr_t keyexpr; @@ -179,8 +204,8 @@ struct rmw_client_data_t // Liveliness token for the client. zc_owned_liveliness_token_t token; - std::mutex message_mutex; - std::deque replies; + std::mutex replies_mutex; + std::deque> replies; const void * request_type_support_impl; const void * response_type_support_impl; @@ -193,6 +218,10 @@ struct rmw_client_data_t std::mutex internal_mutex; std::condition_variable * condition{nullptr}; + uint8_t client_guid[RMW_GID_STORAGE_SIZE]; + + size_t get_next_sequence_number(); + std::mutex sequence_number_mutex; size_t sequence_number{1}; }; diff --git a/rmw_zenoh_cpp/src/detail/zenoh_config.cpp b/rmw_zenoh_cpp/src/detail/zenoh_config.cpp index dd799dfe..8f4e3ef0 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_config.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_config.cpp @@ -50,19 +50,12 @@ rmw_ret_t get_z_config(z_owned_config_t * config) if (zenoh_config_path[0] != '\0') { // If the environment variable is set, try to read the configuration from the file. *config = zc_config_from_file(zenoh_config_path); - RCUTILS_LOG_INFO_NAMED( - "ZenohConfiguration", - "Using zenoh configuration file pointed by '%s' envar: '%s'", kZenohConfigFileEnvVar, - zenoh_config_path); } else { // If the environment variable is not set use internal configuration static const std::string path_to_config_folder = ament_index_cpp::get_package_share_directory(rmw_zenoh_identifier) + "/config/"; const std::string default_zconfig_path = path_to_config_folder + kDefaultZenohConfigFileName; *config = zc_config_from_file(default_zconfig_path.c_str()); - RCUTILS_LOG_INFO_NAMED( - "ZenohConfiguration", - "Using default zenoh configuration file at '%s'", default_zconfig_path.c_str()); } // Verify that the configuration is valid. diff --git a/rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp b/rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp index 2f6ed398..353de39a 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp @@ -56,15 +56,12 @@ rmw_ret_t zenoh_router_check(z_session_t session) // Define callback auto callback = [](const struct z_id_t * id, void * ctx) { const std::string id_str = zid_to_str(*id); - RCUTILS_LOG_INFO_NAMED( - "ZenohRouterCheck", - "A Zenoh router connected to the session with id '%s'", id_str.c_str()); // Note: Callback is guaranteed to never be called // concurrently according to z_info_routers_zid docstring (*(static_cast(ctx)))++; }; - rmw_ret_t ret; + rmw_ret_t ret = RMW_RET_OK; z_owned_closure_zid_t router_callback = z_closure(callback, nullptr /* drop */, &context); if (z_info_routers_zid(session, z_move(router_callback))) { RCUTILS_LOG_ERROR_NAMED( @@ -77,11 +74,6 @@ rmw_ret_t zenoh_router_check(z_session_t session) "ZenohRouterCheck", "No Zenoh router connected to the session"); ret = RMW_RET_ERROR; - } else { - RCUTILS_LOG_INFO_NAMED( - "ZenohRouterCheck", - "There are %d Zenoh routers connected to the session", context); - ret = RMW_RET_OK; } } diff --git a/rmw_zenoh_cpp/src/rmw_event.cpp b/rmw_zenoh_cpp/src/rmw_event.cpp index d547e4c0..e7e44a4b 100644 --- a/rmw_zenoh_cpp/src/rmw_event.cpp +++ b/rmw_zenoh_cpp/src/rmw_event.cpp @@ -16,6 +16,7 @@ #include "rmw/error_handling.h" #include "rmw/event.h" +#include "rmw/types.h" #include "detail/identifier.hpp" diff --git a/rmw_zenoh_cpp/src/rmw_get_network_flow_endpoints.cpp b/rmw_zenoh_cpp/src/rmw_get_network_flow_endpoints.cpp index 381f3dbb..6ad3f52d 100644 --- a/rmw_zenoh_cpp/src/rmw_get_network_flow_endpoints.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_network_flow_endpoints.cpp @@ -12,7 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "rcutils/allocator.h" + #include "rmw/get_network_flow_endpoints.h" +#include "rmw/types.h" extern "C" { diff --git a/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp index 79e069ab..abcb6058 100644 --- a/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp @@ -16,8 +16,11 @@ #include "detail/liveliness_utils.hpp" #include "detail/rmw_data_types.hpp" +#include "rcutils/allocator.h" + #include "rmw/get_node_info_and_types.h" #include "rmw/impl/cpp/macros.hpp" +#include "rmw/types.h" extern "C" { diff --git a/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp index 9cf19aae..34f747f5 100644 --- a/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp @@ -14,8 +14,11 @@ #include "detail/rmw_data_types.hpp" +#include "rcutils/allocator.h" + #include "rmw/error_handling.h" #include "rmw/get_service_names_and_types.h" +#include "rmw/types.h" extern "C" { diff --git a/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp b/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp index 12795f41..d6f3b690 100644 --- a/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp @@ -17,8 +17,11 @@ #include "detail/liveliness_utils.hpp" #include "detail/rmw_data_types.hpp" +#include "rcutils/allocator.h" + #include "rmw/get_topic_endpoint_info.h" #include "rmw/impl/cpp/macros.hpp" +#include "rmw/types.h" extern "C" { diff --git a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp index c5218e1e..7e2f3d24 100644 --- a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp @@ -14,14 +14,16 @@ #include "detail/rmw_data_types.hpp" +#include "rcutils/allocator.h" + #include "rmw/error_handling.h" #include "rmw/get_topic_names_and_types.h" +#include "rmw/types.h" extern "C" { ///============================================================================== /// Return all topic names and types in the ROS graph. -// TODO(yadunund): Fix implementation once discovery information can be cached. rmw_ret_t rmw_get_topic_names_and_types( const rmw_node_t * node, diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 1b612d88..9cff5ca2 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -15,6 +15,7 @@ #include #include +#include #include "detail/guard_condition.hpp" #include "detail/identifier.hpp" @@ -46,11 +47,6 @@ static void graph_sub_data_handler( { (void)data; z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "[graph_sub_data_handler] Received key '%s'", - z_loan(keystr) - ); // Get the context impl from data. rmw_context_impl_s * context_impl = static_cast( @@ -250,11 +246,7 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) const std::string liveliness_str = liveliness::subscription_token(context->actual_domain_id); // Query router/liveliness participants to get graph information before this session was started. - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "Sending Query '%s' to fetch discovery data...", - liveliness_str.c_str() - ); + // We create a blocking channel that is unbounded, ie. `bound` = 0, to receive // replies for the zc_liveliness_get() call. This is necessary as if the `bound` // is too low, the channel may starve the zenoh executor of its threads which @@ -275,9 +267,6 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) if (z_reply_is_ok(&reply)) { z_sample_t sample = z_reply_ok(&reply); z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); - printf( - ">> [discovery] Received ('%s': '%.*s')\n", z_loan(keystr), - static_cast(sample.payload.len), sample.payload.start); context->impl->graph_cache.parse_put(z_loan(keystr)); z_drop(z_move(keystr)); } else { @@ -288,11 +277,6 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) z_drop(z_move(channel)); // TODO(Yadunund): Switch this to a liveliness subscriptions once the API is available. - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "Setting up liveliness subscription on key: %s", - liveliness_str.c_str() - ); // Uncomment and rely on #if #endif blocks to enable this feature when building with // zenoh-pico since liveliness is only available in zenoh-c. @@ -351,7 +335,30 @@ rmw_shutdown(rmw_context_t * context) rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + // We destroy zenoh artifacts here instead of rmw_shutdown() as + // rmw_shutdown() is invoked before rmw_destroy_node() however we still need the session + // alive for the latter. + // TODO(Yadunund): Check if this is a bug in rmw. + z_undeclare_subscriber(z_move(context->impl->graph_subscriber)); + // Close the zenoh session + if (z_close(z_move(context->impl->session)) < 0) { + RMW_SET_ERROR_MSG("Error while closing zenoh session"); + return RMW_RET_ERROR; + } + + const rcutils_allocator_t * allocator = &context->options.allocator; + + z_drop(z_move(context->impl->shm_manager)); + + RMW_TRY_DESTRUCTOR( + static_cast(context->impl->graph_guard_condition->data)->~GuardCondition(), + GuardCondition, ); + allocator->deallocate(context->impl->graph_guard_condition->data, allocator->state); + + allocator->deallocate(context->impl->graph_guard_condition, allocator->state); + context->impl->is_shutdown = true; + return RMW_RET_OK; } @@ -375,29 +382,10 @@ rmw_context_fini(rmw_context_t * context) return RMW_RET_INVALID_ARGUMENT; } - // We destroy zenoh artifacts here instead of rmw_shutdown() as - // rmw_shutdown() is invoked before rmw_destroy_node() however we still need the session - // alive for the latter. - // TODO(Yadunund): Check if this is a bug in rmw. - z_undeclare_subscriber(z_move(context->impl->graph_subscriber)); - // Close the zenoh session - if (z_close(z_move(context->impl->session)) < 0) { - RMW_SET_ERROR_MSG("Error while closing zenoh session"); - return RMW_RET_ERROR; - } + RMW_TRY_DESTRUCTOR(context->impl->~rmw_context_impl_t(), rmw_context_impl_t, ); const rcutils_allocator_t * allocator = &context->options.allocator; - z_drop(z_move(context->impl->shm_manager)); - - RMW_TRY_DESTRUCTOR( - static_cast(context->impl->graph_guard_condition->data)->~GuardCondition(), - GuardCondition, ); - allocator->deallocate(context->impl->graph_guard_condition->data, allocator->state); - - allocator->deallocate(context->impl->graph_guard_condition, allocator->state); - - RMW_TRY_DESTRUCTOR(context->impl->~rmw_context_impl_t(), rmw_context_impl_t, ); allocator->deallocate(context->impl, allocator->state); rmw_ret_t ret = rmw_init_options_fini(&context->options); diff --git a/rmw_zenoh_cpp/src/rmw_init_options.cpp b/rmw_zenoh_cpp/src/rmw_init_options.cpp index c34a6262..9eb146a4 100644 --- a/rmw_zenoh_cpp/src/rmw_init_options.cpp +++ b/rmw_zenoh_cpp/src/rmw_init_options.cpp @@ -17,7 +17,7 @@ #include "detail/identifier.hpp" #include "detail/rmw_init_options_impl.hpp" - +#include "rcutils/allocator.h" #include "rcutils/strdup.h" #include "rcutils/types.h" diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 5653fae6..0e1d53c4 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -19,9 +19,15 @@ #include #include +#include +#include #include #include +#include +#include #include +#include +#include #include "detail/guard_condition.hpp" #include "detail/graph_cache.hpp" @@ -521,6 +527,13 @@ rmw_create_publisher( allocator->deallocate(publisher_data, allocator->state); }); + RMW_TRY_PLACEMENT_NEW(publisher_data, publisher_data, return nullptr, rmw_publisher_data_t); + auto destruct_publisher_data = rcpputils::make_scope_exit( + [publisher_data]() { + RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( + publisher_data->~rmw_publisher_data_t(), rmw_publisher_data_t); + }); + publisher_data->typesupport_identifier = type_support->typesupport_identifier; publisher_data->type_support_impl = type_support->data; auto callbacks = static_cast(type_support->data); @@ -650,6 +663,7 @@ rmw_create_publisher( free_topic_name.cancel(); destruct_msg_type_support.cancel(); free_type_support.cancel(); + destruct_publisher_data.cancel(); free_publisher_data.cancel(); free_rmw_publisher.cancel(); @@ -707,6 +721,7 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) RMW_SET_ERROR_MSG("failed to undeclare pub"); ret = RMW_RET_ERROR; } + RMW_TRY_DESTRUCTOR(publisher_data->~rmw_publisher_data_t(), rmw_publisher_data_t, ); allocator->deallocate(publisher_data, allocator->state); } allocator->deallocate(const_cast(publisher->topic_name), allocator->state); @@ -1462,8 +1477,7 @@ static rmw_ret_t __rmw_take( const rmw_subscription_t * subscription, void * ros_message, bool * taken, - rmw_message_info_t * message_info, - rmw_subscription_allocation_t * allocation) + rmw_message_info_t * message_info) { RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(subscription->topic_name, RMW_RET_ERROR); @@ -1476,13 +1490,15 @@ static rmw_ret_t __rmw_take( subscription->implementation_identifier, rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - static_cast(allocation); - *taken = false; auto sub_data = static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); + if (sub_data->context->impl->is_shutdown) { + return RMW_RET_OK; + } + // RETRIEVE SERIALIZED MESSAGE =============================================== std::unique_ptr msg_data; @@ -1494,9 +1510,8 @@ static rmw_ret_t __rmw_take( return RMW_RET_OK; } - // NOTE(CH3): Potential place to handle "QoS" (e.g. could pop from back so it is LIFO) - msg_data = std::move(sub_data->message_queue.back()); - sub_data->message_queue.pop_back(); + msg_data = std::move(sub_data->message_queue.front()); + sub_data->message_queue.pop_front(); } // Object that manages the raw buffer @@ -1544,9 +1559,11 @@ rmw_take( bool * taken, rmw_subscription_allocation_t * allocation) { + static_cast(allocation); + rmw_message_info_t dummy_msg_info; - return __rmw_take(subscription, ros_message, taken, &dummy_msg_info, allocation); + return __rmw_take(subscription, ros_message, taken, &dummy_msg_info); } //============================================================================== @@ -1559,7 +1576,8 @@ rmw_take_with_info( rmw_message_info_t * message_info, rmw_subscription_allocation_t * allocation) { - return __rmw_take(subscription, ros_message, taken, message_info, allocation); + static_cast(allocation); + return __rmw_take(subscription, ros_message, taken, message_info); } //============================================================================== @@ -1662,6 +1680,18 @@ rmw_return_loaned_message_from_subscription( return RMW_RET_UNSUPPORTED; } +static void generate_random_guid(uint8_t guid[RMW_GID_STORAGE_SIZE]) +{ + std::random_device dev; + std::mt19937 rng(dev()); + std::uniform_int_distribution dist( + std::numeric_limits::min(), std::numeric_limits::max()); + + for (size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i) { + guid[i] = dist(rng); + } +} + //============================================================================== /// Create a service client that can send requests to and receive replies from a service server. rmw_client_t * @@ -1671,11 +1701,6 @@ rmw_create_client( const char * service_name, const rmw_qos_profile_t * qos_profile) { - RCUTILS_LOG_INFO_NAMED( - "rmw_zenoh_common_cpp", - "[rmw_create_client] %s with queue of depth %ld", - service_name, - qos_profile->depth); RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( @@ -1760,6 +1785,8 @@ rmw_create_client( rmw_client_data_t); }); + generate_random_guid(client_data->client_guid); + // Obtain the type support const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); if (type_support == nullptr) { @@ -1900,11 +1927,11 @@ rmw_create_client( free_rmw_client.cancel(); free_client_data.cancel(); - free_request_type_support.cancel(); destruct_request_type_support.cancel(); + free_request_type_support.cancel(); + destruct_response_type_support.cancel(); free_response_type_support.cancel(); destruct_client_data.cancel(); - destruct_response_type_support.cancel(); free_service_name.cancel(); free_ros_keyexpr.cancel(); @@ -1942,14 +1969,18 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) // CLEANUP =================================================================== z_drop(z_move(client_data->zn_closure_reply)); z_drop(z_move(client_data->keyexpr)); - for (z_owned_reply_t & reply : client_data->replies) { - z_reply_drop(&reply); - } client_data->replies.clear(); z_drop(z_move(client_data->token)); + RMW_TRY_DESTRUCTOR( + client_data->request_type_support->~RequestTypeSupport(), RequestTypeSupport, ); allocator->deallocate(client_data->request_type_support, allocator->state); + + RMW_TRY_DESTRUCTOR( + client_data->response_type_support->~ResponseTypeSupport(), ResponseTypeSupport, ); allocator->deallocate(client_data->response_type_support, allocator->state); + RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, ); + allocator->deallocate(client->data, allocator->state); allocator->deallocate(const_cast(client->service_name), allocator->state); @@ -1958,23 +1989,48 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) return RMW_RET_OK; } -static z_owned_bytes_map_t create_map_and_set_sequence_num(int64_t sequence_number) +static z_owned_bytes_map_t create_map_and_set_sequence_num( + int64_t sequence_number, uint8_t guid[RMW_GID_STORAGE_SIZE]) { z_owned_bytes_map_t map = z_bytes_map_new(); if (!z_check(map)) { RMW_SET_ERROR_MSG("failed to allocate map for sequence number"); return z_bytes_map_null(); } + auto free_attachment_map = rcpputils::make_scope_exit( + [&map]() { + z_bytes_map_drop(z_move(map)); + }); // The largest possible int64_t number is INT64_MAX, i.e. 9223372036854775807. // That is 19 characters long, plus one for the trailing \0, means we need 20 bytes. char seq_id_str[20]; - if (rcutils_snprintf(seq_id_str, 20, "%" PRId64, sequence_number) < 0) { + if (rcutils_snprintf(seq_id_str, sizeof(seq_id_str), "%" PRId64, sequence_number) < 0) { RMW_SET_ERROR_MSG("failed to print sequence_number into buffer"); return z_bytes_map_null(); } z_bytes_map_insert_by_copy(&map, z_bytes_new("sequence_number"), z_bytes_new(seq_id_str)); + auto now = std::chrono::system_clock::now().time_since_epoch(); + auto now_ns = std::chrono::duration_cast(now); + char source_ts_str[20]; + if (rcutils_snprintf(source_ts_str, sizeof(source_ts_str), "%" PRId64, now_ns.count()) < 0) { + RMW_SET_ERROR_MSG("failed to print sequence_number into buffer"); + return z_bytes_map_null(); + } + z_bytes_map_insert_by_copy(&map, z_bytes_new("source_timestamp"), z_bytes_new(source_ts_str)); + + z_bytes_t guid_bytes; + guid_bytes.len = RMW_GID_STORAGE_SIZE; + guid_bytes.start = static_cast(malloc(RMW_GID_STORAGE_SIZE)); + memcpy(static_cast(const_cast(guid_bytes.start)), guid, RMW_GID_STORAGE_SIZE); + + z_bytes_map_insert_by_copy(&map, z_bytes_new("client_guid"), guid_bytes); + + free(const_cast(guid_bytes.start)); + + free_attachment_map.cancel(); + return map; } @@ -1986,8 +2042,6 @@ rmw_send_request( const void * ros_request, int64_t * sequence_id) { - RCUTILS_LOG_INFO_NAMED( - "rmw_zenoh_cpp", "[rmw_send_request]"); RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(ros_request, RMW_RET_INVALID_ARGUMENT); @@ -2046,13 +2100,12 @@ rmw_send_request( size_t data_length = ser.getSerializedDataLength(); - // TODO(clalancette): Locking for multiple requests at the same time - *sequence_id = client_data->sequence_number++; + *sequence_id = client_data->get_next_sequence_number(); // Send request z_get_options_t opts = z_get_options_default(); - z_owned_bytes_map_t map = create_map_and_set_sequence_num(*sequence_id); + z_owned_bytes_map_t map = create_map_and_set_sequence_num(*sequence_id, client_data->client_guid); if (!z_check(map)) { // create_map_and_set_sequence_num already set the error return RMW_RET_ERROR; @@ -2065,8 +2118,9 @@ rmw_send_request( opts.attachment = z_bytes_map_as_attachment(&map); opts.target = Z_QUERY_TARGET_ALL_COMPLETE; - // Latest consolidation guarantees unicity of replies for the same key expression. It optimizes bandwidth. - // Default is None which imples replies may come in any order and any number. + // Latest consolidation guarantees unicity of replies for the same key expression, + // which optimizes bandwidth. The default is "None", which imples replies may come in any order + // and any number. opts.consolidation = z_query_consolidation_latest(); opts.value.payload = z_bytes_t{data_length, reinterpret_cast(request_bytes)}; opts.value.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL); @@ -2078,63 +2132,86 @@ rmw_send_request( return RMW_RET_OK; } -static int64_t get_sequence_num_from_attachment(const z_attachment_t * const attachment) +static int64_t get_int64_from_attachment( + const z_attachment_t * const attachment, const std::string & name) { - // Get the sequence_number out of the attachment if (!z_check(*attachment)) { // A valid request must have had an attachment RMW_SET_ERROR_MSG("Could not get attachment from query"); return -1; } - z_bytes_t sequence_num_index = z_attachment_get(*attachment, z_bytes_new("sequence_number")); - if (!z_check(sequence_num_index)) { - // A valid request must have had a sequence number attached - RMW_SET_ERROR_MSG("Could not get sequence number from query"); + z_bytes_t index = z_attachment_get(*attachment, z_bytes_new(name.c_str())); + if (!z_check(index)) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("Could not get %s from attachment", name.c_str()); return -1; } - if (sequence_num_index.len < 1) { - RMW_SET_ERROR_MSG("No value specified for the sequence number"); + if (index.len < 1) { + RMW_SET_ERROR_MSG("no value specified"); return -1; } - if (sequence_num_index.len > 19) { - // The sequence number was larger than we expected - RMW_SET_ERROR_MSG("Sequence number too large"); + if (index.len > 19) { + // The number was larger than we expected + RMW_SET_ERROR_MSG("number too large"); return -1; } // The largest possible int64_t number is INT64_MAX, i.e. 9223372036854775807. // That is 19 characters long, plus one for the trailing \0, means we need 20 bytes. - char sequence_num_str[20]; + char int64_str[20]; - memcpy(sequence_num_str, sequence_num_index.start, sequence_num_index.len); - sequence_num_str[sequence_num_index.len] = '\0'; + memcpy(int64_str, index.start, index.len); + int64_str[index.len] = '\0'; errno = 0; char * endptr; - int64_t seqnum = strtol(sequence_num_str, &endptr, 10); - if (seqnum == 0) { + int64_t num = strtol(int64_str, &endptr, 10); + if (num == 0) { // This is an error regardless; the client should never send this - RMW_SET_ERROR_MSG("A invalid zero value sent as the sequence number"); + RMW_SET_ERROR_MSG("a invalid zero value sent"); return -1; - } else if (endptr == sequence_num_str) { + } else if (endptr == int64_str) { // No values were converted, this is an error - RMW_SET_ERROR_MSG("No valid numbers available in the sequence number"); + RMW_SET_ERROR_MSG("no valid numbers available"); return -1; } else if (*endptr != '\0') { // There was junk after the number - RMW_SET_ERROR_MSG("Non-numeric values in the sequence number"); + RMW_SET_ERROR_MSG("non-numeric values"); return -1; } else if (errno != 0) { // Some other error occurred, which may include overflow or underflow RMW_SET_ERROR_MSG( - "An undefined error occurred while getting the sequence number, this may be an overflow"); + "an undefined error occurred while getting the number, this may be an overflow"); return -1; } - return seqnum; + return num; +} + +static bool get_client_guid_from_attachment( + const z_attachment_t * const attachment, uint8_t guid[RMW_GID_STORAGE_SIZE]) +{ + if (!z_check(*attachment)) { + RMW_SET_ERROR_MSG("Could not get client_guid from attachment"); + return false; + } + + z_bytes_t index = z_attachment_get(*attachment, z_bytes_new("client_guid")); + if (!z_check(index)) { + RMW_SET_ERROR_MSG("Could not get client_guid from attachment"); + return false; + } + + if (index.len != RMW_GID_STORAGE_SIZE) { + RMW_SET_ERROR_MSG("Invalid size for GUID storage"); + return false; + } + + memcpy(guid, index.start, index.len); + + return true; } //============================================================================== @@ -2147,7 +2224,6 @@ rmw_take_response( bool * taken) { *taken = false; - RCUTILS_LOG_INFO_NAMED("rmw_zenoh_cpp", "[rmw_take_response]"); RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); @@ -2165,20 +2241,26 @@ rmw_take_response( RMW_CHECK_FOR_NULL_WITH_MSG( client->data, "Unable to retrieve client_data from client.", RMW_RET_INVALID_ARGUMENT); - z_owned_reply_t * latest_reply = nullptr; - - std::lock_guard lock(client_data->message_mutex); - if (client_data->replies.empty()) { - RCUTILS_LOG_ERROR_NAMED("rmw_zenoh_cpp", "[rmw_take_response] Response message is empty"); + std::unique_ptr latest_reply = nullptr; + { + std::lock_guard lock(client_data->replies_mutex); + if (client_data->replies.empty()) { + RCUTILS_LOG_ERROR_NAMED("rmw_zenoh_cpp", "[rmw_take_response] Response message is empty"); + return RMW_RET_ERROR; + } + latest_reply = std::move(client_data->replies.front()); + client_data->replies.pop_front(); + } + std::optional sample = latest_reply->get_sample(); + if (!sample) { + RMW_SET_ERROR_MSG("invalid reply sample"); return RMW_RET_ERROR; } - latest_reply = &client_data->replies.front(); - z_sample_t sample = z_reply_ok(latest_reply); // Object that manages the raw buffer eprosima::fastcdr::FastBuffer fastbuffer( - reinterpret_cast(const_cast(sample.payload.start)), - sample.payload.len); + reinterpret_cast(const_cast(sample->payload.start)), + sample->payload.len); // Object that serializes the data eprosima::fastcdr::Cdr deser( @@ -2194,18 +2276,34 @@ rmw_take_response( return RMW_RET_ERROR; } - request_header->request_id.sequence_number = get_sequence_num_from_attachment(&sample.attachment); + // Fill in the request_header + + request_header->request_id.sequence_number = + get_int64_from_attachment(&sample->attachment, "sequence_number"); if (request_header->request_id.sequence_number < 0) { - // get_sequence_num_from_attachment already set an error + // get_int64_from_attachment already set an error return RMW_RET_ERROR; } - // TODO(clalancette): We also need to fill in the source_timestamp, received_timestamp, - // and writer_guid - *taken = true; + request_header->source_timestamp = + get_int64_from_attachment(&sample->attachment, "source_timestamp"); + if (request_header->source_timestamp < 0) { + // get_int64_from_attachment already set an error + return RMW_RET_ERROR; + } + + if (!get_client_guid_from_attachment( + &sample->attachment, request_header->request_id.writer_guid)) + { + // get_client_guid_from_attachment already set an error + return RMW_RET_ERROR; + } - client_data->replies.pop_front(); - z_reply_drop(latest_reply); + auto now = std::chrono::system_clock::now().time_since_epoch(); + auto now_ns = std::chrono::duration_cast(now); + request_header->received_timestamp = now_ns.count(); + + *taken = true; return RMW_RET_OK; } @@ -2245,11 +2343,6 @@ rmw_create_service( const char * service_name, const rmw_qos_profile_t * qos_profiles) { - RCUTILS_LOG_INFO_NAMED( - "rmw_zenoh_cpp", - "[rmw_create_service] %s", - service_name); - // ASSERTIONS ================================================================ RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( @@ -2532,13 +2625,19 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) // CLEANUP ================================================================ z_drop(z_move(service_data->keyexpr)); z_drop(z_move(service_data->qable)); - for (z_owned_query_t & query : service_data->query_queue) { - z_drop(z_move(query)); - } + service_data->sequence_to_query_map.clear(); + service_data->query_queue.clear(); z_drop(z_move(service_data->token)); + RMW_TRY_DESTRUCTOR( + service_data->request_type_support->~RequestTypeSupport(), RequestTypeSupport, ); allocator->deallocate(service_data->request_type_support, allocator->state); + + RMW_TRY_DESTRUCTOR( + service_data->response_type_support->~ResponseTypeSupport(), ResponseTypeSupport, ); allocator->deallocate(service_data->response_type_support, allocator->state); + + RMW_TRY_DESTRUCTOR(service_data->~rmw_service_data_t(), rmw_service_data_t, ); allocator->deallocate(service->data, allocator->state); allocator->deallocate(const_cast(service->service_name), allocator->state); @@ -2557,7 +2656,6 @@ rmw_take_request( bool * taken) { *taken = false; - RCUTILS_LOG_INFO_NAMED("rmw_zenoh_cpp", "[rmw_take_request]"); RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(service->data, RMW_RET_INVALID_ARGUMENT); @@ -2577,37 +2675,17 @@ rmw_take_request( RMW_CHECK_FOR_NULL_WITH_MSG( service->data, "Unable to retrieve service_data from service", RMW_RET_INVALID_ARGUMENT); - z_owned_query_t query; + std::unique_ptr query = nullptr; { std::lock_guard lock(service_data->query_queue_mutex); if (service_data->query_queue.empty()) { return RMW_RET_OK; } - query = service_data->query_queue.front(); + query = std::move(service_data->query_queue.front()); + service_data->query_queue.pop_front(); } - const z_query_t loaned_query = z_query_loan(&query); - - // Get the sequence_number out of the attachment - z_attachment_t attachment = z_query_attachment(&loaned_query); - - int64_t sequence_number = get_sequence_num_from_attachment(&attachment); - if (sequence_number < 0) { - // get_sequence_number_from_attachment already set the error - return RMW_RET_ERROR; - } - - // Add this query to the map, so that rmw_send_response can quickly look it up later - { - std::lock_guard lock(service_data->sequence_to_query_map_mutex); - if (service_data->sequence_to_query_map.find(sequence_number) != - service_data->sequence_to_query_map.end()) - { - RMW_SET_ERROR_MSG("duplicate sequence number in the map"); - return RMW_RET_ERROR; - } - service_data->sequence_to_query_map.emplace(std::pair(sequence_number, query)); - } + const z_query_t loaned_query = query->get_query(); // DESERIALIZE MESSAGE ======================================================== z_value_t payload_value = z_query_value(&loaned_query); @@ -2632,11 +2710,44 @@ rmw_take_request( } // Fill in the request header. - // TODO(clalancette): We also need to fill in writer_guid, source_timestamp, - // and received_timestamp - request_header->request_id.sequence_number = sequence_number; - service_data->query_queue.pop_front(); + // Get the sequence_number out of the attachment + z_attachment_t attachment = z_query_attachment(&loaned_query); + + request_header->request_id.sequence_number = + get_int64_from_attachment(&attachment, "sequence_number"); + if (request_header->request_id.sequence_number < 0) { + // get_int64_from_attachment already set the error + return RMW_RET_ERROR; + } + + request_header->source_timestamp = get_int64_from_attachment(&attachment, "source_timestamp"); + if (request_header->source_timestamp < 0) { + // get_int64_from_attachment already set the error + return RMW_RET_ERROR; + } + + if (!get_client_guid_from_attachment(&attachment, request_header->request_id.writer_guid)) { + // get_client_guid_from_attachment already set an error + return RMW_RET_ERROR; + } + + auto now = std::chrono::system_clock::now().time_since_epoch(); + auto now_ns = std::chrono::duration_cast(now); + request_header->received_timestamp = now_ns.count(); + + // Add this query to the map, so that rmw_send_response can quickly look it up later + { + std::lock_guard lock(service_data->sequence_to_query_map_mutex); + if (service_data->sequence_to_query_map.find(request_header->request_id.sequence_number) != + service_data->sequence_to_query_map.end()) + { + RMW_SET_ERROR_MSG("duplicate sequence number in the map"); + return RMW_RET_ERROR; + } + service_data->sequence_to_query_map.emplace( + std::pair(request_header->request_id.sequence_number, std::move(query))); + } *taken = true; @@ -2651,9 +2762,6 @@ rmw_send_response( rmw_request_id_t * request_header, void * ros_response) { - RCUTILS_LOG_INFO_NAMED( - "rmw_zenoh_cpp", "[rmw_send_response]"); - RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(service->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(request_header, RMW_RET_INVALID_ARGUMENT); @@ -2716,11 +2824,11 @@ rmw_send_response( RMW_SET_ERROR_MSG("Unable to find taken request. Report this bug."); return RMW_RET_ERROR; } - const z_query_t loaned_query = z_query_loan(&query_it->second); + const z_query_t loaned_query = query_it->second->get_query(); z_query_reply_options_t options = z_query_reply_options_default(); - // TODO(clalancette): We also need to fill in and send the writer_guid - z_owned_bytes_map_t map = create_map_and_set_sequence_num(request_header->sequence_number); + z_owned_bytes_map_t map = create_map_and_set_sequence_num( + request_header->sequence_number, request_header->writer_guid); if (!z_check(map)) { // create_map_and_set_sequence_num already set the error return RMW_RET_ERROR; @@ -2737,7 +2845,6 @@ rmw_send_response( &loaned_query, z_loan(service_data->keyexpr), reinterpret_cast( response_bytes), data_length, &options); - z_drop(z_move(query_it->second)); service_data->sequence_to_query_map.erase(query_it); return RMW_RET_OK; } @@ -2930,107 +3037,173 @@ rmw_destroy_wait_set(rmw_wait_set_t * wait_set) return RMW_RET_OK; } -//============================================================================== -/// Waits on sets of different entities and returns when one is ready. -rmw_ret_t -rmw_wait( +static bool has_triggered_condition( rmw_subscriptions_t * subscriptions, rmw_guard_conditions_t * guard_conditions, rmw_services_t * services, rmw_clients_t * clients, - rmw_events_t * events, - rmw_wait_set_t * wait_set, - const rmw_time_t * wait_timeout) + rmw_events_t * events) { - RMW_CHECK_ARGUMENT_FOR_NULL(wait_set, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_TYPE_IDENTIFIERS_MATCH( - wait set handle, - wait_set->implementation_identifier, rmw_zenoh_identifier, - return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - - // TODO(yadunund): Switch to debug log level. - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "[rmw_wait] %ld subscriptions, %ld services, %ld clients, %ld events, %ld guard conditions", - subscriptions->subscriber_count, - services->service_count, - clients->client_count, - events->event_count, - guard_conditions->guard_condition_count); - - // TODO(yadunund): Switch to debug log level. - if (wait_timeout) { - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_common_cpp", "[rmw_wait] TIMEOUT: %ld s %ld ns", - wait_timeout->sec, - wait_timeout->nsec); - } - - auto wait_set_data = static_cast(wait_set->data); - RMW_CHECK_FOR_NULL_WITH_MSG( - wait_set_data, - "waitset data struct is null", - return RMW_RET_ERROR); + static_cast(events); if (guard_conditions) { - // Go through each of the guard conditions, and attach the wait set condition variable to them. - // That way they can wake it up if they are triggered while we are waiting. for (size_t i = 0; i < guard_conditions->guard_condition_count; ++i) { - // This is hard to track down, but each of the (void *) pointers in - // guard_conditions->guard_conditions points to the data field of the related - // rmw_guard_condition_t. So we can directly cast it to GuardCondition. GuardCondition * gc = static_cast(guard_conditions->guard_conditions[i]); - if (gc != nullptr) { - gc->attach_condition(&wait_set_data->condition_variable); + if (gc != nullptr && gc->has_triggered()) { + return true; } } } + // TODO(clalancette): Deal with events + if (subscriptions) { - // Go through each of the subscriptions and attach the wait set condition variable to them. - // That way they can wake it up if they are triggered while we are waiting. for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { auto sub_data = static_cast(subscriptions->subscribers[i]); if (sub_data != nullptr) { - sub_data->condition = &wait_set_data->condition_variable; + std::lock_guard internal_lock(sub_data->internal_mutex); + if (!sub_data->message_queue.empty()) { + return true; + } } } } - if (services) { - // Go through each of the services and attach the wait set condition variable to them. - // That way they can wake it up if they are triggered while we are waiting. for (size_t i = 0; i < services->service_count; ++i) { auto serv_data = static_cast(services->services[i]); if (serv_data != nullptr) { - serv_data->condition = &wait_set_data->condition_variable; + std::lock_guard internal_lock(serv_data->internal_mutex); + if (!serv_data->query_queue.empty()) { + return true; + } } } } if (clients) { - // Go through each of the clients and attach the wait set condition variable to them. - // That way they can wake it up if they are triggered while we are waiting. for (size_t i = 0; i < clients->client_count; ++i) { rmw_client_data_t * client_data = static_cast(clients->clients[i]); if (client_data != nullptr) { - client_data->condition = &wait_set_data->condition_variable; + std::lock_guard internal_lock(client_data->internal_mutex); + if (!client_data->replies.empty()) { + return true; + } } } } - std::unique_lock lock(wait_set_data->condition_mutex); + return false; +} - // According to the RMW documentation, if wait_timeout is NULL that means - // "wait forever", if it specified by 0 it means "never wait", and if it is anything else wait - // for that amount of time. - if (wait_timeout == nullptr) { - wait_set_data->condition_variable.wait(lock); - } else { - if (wait_timeout->sec != 0 || wait_timeout->nsec != 0) { - wait_set_data->condition_variable.wait_for( - lock, std::chrono::nanoseconds(wait_timeout->nsec + RCUTILS_S_TO_NS(wait_timeout->sec))); +//============================================================================== +/// Waits on sets of different entities and returns when one is ready. +rmw_ret_t +rmw_wait( + rmw_subscriptions_t * subscriptions, + rmw_guard_conditions_t * guard_conditions, + rmw_services_t * services, + rmw_clients_t * clients, + rmw_events_t * events, + rmw_wait_set_t * wait_set, + const rmw_time_t * wait_timeout) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(wait_set, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + wait set handle, + wait_set->implementation_identifier, rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + + auto wait_set_data = static_cast(wait_set->data); + RMW_CHECK_FOR_NULL_WITH_MSG( + wait_set_data, + "waitset data struct is null", + return RMW_RET_ERROR); + + // rmw_wait should return *all* entities that have data available, and let the caller decide + // how to handle them. + // + // If there is no data currently available in any of the entities we were told to wait on, we + // we attach a context-global condition variable to each entity, calculate a timeout based on + // wait_timeout, and then sleep on the condition variable. If any of the entities has an event + // during that time, it will wake up from that sleep. + // + // If there is data currently available in one or more of the entities, then we'll skip attaching + // the condition variable, and skip the sleep, and instead just go to the last part. + // + // In the last part, we check every entity and see if there are conditions that make it ready. + // If that entity is not ready, then we set the pointer to it to nullptr in the wait set, which + // signals to the upper layers that it isn't ready. If something is ready, then we leave it as + // a valid pointer. + + bool skip_wait = has_triggered_condition( + subscriptions, guard_conditions, services, clients, events); + bool wait_result = true; + + if (!skip_wait) { + if (guard_conditions) { + // Attach the wait set condition variable to each guard condition. + // That way they can wake it up if they are triggered while we are waiting. + for (size_t i = 0; i < guard_conditions->guard_condition_count; ++i) { + // This is hard to track down, but each of the (void *) pointers in + // guard_conditions->guard_conditions points to the data field of the related + // rmw_guard_condition_t. So we can directly cast it to GuardCondition. + GuardCondition * gc = static_cast(guard_conditions->guard_conditions[i]); + if (gc != nullptr) { + gc->attach_condition(&wait_set_data->condition_variable); + } + } + } + + if (subscriptions) { + // Attach the wait set condition variable to each subscription. + // That way they can wake it up if they are triggered while we are waiting. + for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { + auto sub_data = static_cast(subscriptions->subscribers[i]); + if (sub_data != nullptr) { + std::lock_guard internal_lock(sub_data->internal_mutex); + sub_data->condition = &wait_set_data->condition_variable; + } + } + } + + if (services) { + // Attach the wait set condition variable to each service. + // That way they can wake it up if they are triggered while we are waiting. + for (size_t i = 0; i < services->service_count; ++i) { + auto serv_data = static_cast(services->services[i]); + if (serv_data != nullptr) { + std::lock_guard internal_lock(serv_data->internal_mutex); + serv_data->condition = &wait_set_data->condition_variable; + } + } + } + + if (clients) { + // Attach the wait set condition variable to each client. + // That way they can wake it up if they are triggered while we are waiting. + for (size_t i = 0; i < clients->client_count; ++i) { + rmw_client_data_t * client_data = static_cast(clients->clients[i]); + if (client_data != nullptr) { + std::lock_guard internal_lock(client_data->internal_mutex); + client_data->condition = &wait_set_data->condition_variable; + } + } + } + + std::unique_lock lock(wait_set_data->condition_mutex); + + // According to the RMW documentation, if wait_timeout is NULL that means + // "wait forever", if it specified by 0 it means "never wait", and if it is anything else wait + // for that amount of time. + if (wait_timeout == nullptr) { + wait_set_data->condition_variable.wait(lock); + } else { + if (wait_timeout->sec != 0 || wait_timeout->nsec != 0) { + std::cv_status wait_status = wait_set_data->condition_variable.wait_for( + lock, std::chrono::nanoseconds(wait_timeout->nsec + RCUTILS_S_TO_NS(wait_timeout->sec))); + wait_result = wait_status == std::cv_status::no_timeout; + } } } @@ -3054,6 +3227,7 @@ rmw_wait( for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { auto sub_data = static_cast(subscriptions->subscribers[i]); if (sub_data != nullptr) { + std::lock_guard internal_lock(sub_data->internal_mutex); sub_data->condition = nullptr; // According to the documentation for rmw_wait in rmw.h, entries in the // array that have *not* been triggered should be set to NULL @@ -3070,6 +3244,7 @@ rmw_wait( for (size_t i = 0; i < services->service_count; ++i) { auto serv_data = static_cast(services->services[i]); if (serv_data != nullptr) { + std::lock_guard internal_lock(serv_data->internal_mutex); serv_data->condition = nullptr; if (serv_data->query_queue.empty()) { // Setting to nullptr lets rcl know that this service is not ready @@ -3084,6 +3259,7 @@ rmw_wait( for (size_t i = 0; i < clients->client_count; ++i) { rmw_client_data_t * client_data = static_cast(clients->clients[i]); if (client_data != nullptr) { + std::lock_guard internal_lock(client_data->internal_mutex); client_data->condition = nullptr; // According to the documentation for rmw_wait in rmw.h, entries in the // array that have *not* been triggered should be set to NULL @@ -3095,8 +3271,7 @@ rmw_wait( } } - - return RMW_RET_OK; + return (skip_wait || wait_result) ? RMW_RET_OK : RMW_RET_TIMEOUT; } //==============================================================================