diff --git a/rmw_zenoh_config.json5 b/rmw_zenoh_config.json5 deleted file mode 100644 index 9b8d8887..00000000 --- a/rmw_zenoh_config.json5 +++ /dev/null @@ -1,18 +0,0 @@ -{ - /// The node's mode (router, peer or client) - mode: "peer", - - /// The default timeout to apply to queries in milliseconds. - queries_default_timeout: 10000, - - /// Configure internal transport parameters - transport: { - qos: { - enabled: true, - }, - // Shared memory configuration - shared_memory: { - enabled: false, - }, - }, -} diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 51c75a6e..c442ddfd 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -112,8 +112,8 @@ void GraphCache::parse_put(const std::string & keyexpr) return graph_node.clients_; } }(entity, graph_node, entity_desc); - // For the sake of reusing data structures and lookup functions, we treat publishers and clients are equivalent. - // Similarly, subscriptions and services are equivalent. + // For the sake of reusing data structures and lookup functions, we treat publishers and + // clients are equivalent. Similarly, subscriptions and services are equivalent. const std::size_t pub_count = entity.type() == EntityType::Publisher || entity.type() == EntityType::Client ? 1 : 0; const std::size_t sub_count = !pub_count; @@ -173,14 +173,6 @@ void GraphCache::parse_put(const std::string & keyexpr) topic_data_insertion.first->second->stats_.sub_count_ += sub_count; } } - - RCUTILS_LOG_INFO_NAMED( - "rmw_zenoh_cpp", - "Added %s on topic %s with type %s to node /%s.", - entity_desc.c_str(), - topic_info.name_.c_str(), - topic_info.type_.c_str(), - graph_node.name_.c_str()); }; // Helper lambda to convert an Entity into a GraphNode. @@ -213,10 +205,6 @@ void GraphCache::parse_put(const std::string & keyexpr) NodeMap node_map = { {entity.node_name(), make_graph_node(entity, *this)}}; graph_.emplace(std::make_pair(entity.node_namespace(), std::move(node_map))); - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Added node /%s to a new namespace %s in the graph.", - entity.node_name().c_str(), - entity.node_namespace().c_str()); return; } @@ -238,14 +226,7 @@ void GraphCache::parse_put(const std::string & keyexpr) // name but unique id. NodeMap::iterator insertion_it = ns_it->second.insert(std::make_pair(entity.node_name(), make_graph_node(entity, *this))); - if (insertion_it != ns_it->second.end()) { - RCUTILS_LOG_INFO_NAMED( - "rmw_zenoh_cpp", - "Added a new node /%s with id %s to an existing namespace %s in the graph.", - entity.node_name().c_str(), - entity.id().c_str(), - entity.node_namespace().c_str()); - } else { + if (insertion_it == ns_it->second.end()) { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to add a new node /%s with id %s an " @@ -369,8 +350,8 @@ void GraphCache::parse_del(const std::string & keyexpr) return graph_node.clients_; } }(entity, graph_node, entity_desc); - // For the sake of reusing data structures and lookup functions, we treat publishers and clients are equivalent. - // Similarly, subscriptions and services are equivalent. + // For the sake of reusing data structures and lookup functions, we treat publishers and + // clients are equivalent. Similarly, subscriptions and services are equivalent. const std::size_t pub_count = entity.type() == EntityType::Publisher || entity.type() == EntityType::Client ? 1 : 0; const std::size_t sub_count = !pub_count; @@ -409,14 +390,6 @@ void GraphCache::parse_del(const std::string & keyexpr) // Bookkeeping: Update graph_topic_ which keeps track of topics across all nodes in the graph. update_graph_topics(topic_info, entity.type(), pub_count, sub_count, graph_cache); - - RCUTILS_LOG_INFO_NAMED( - "rmw_zenoh_cpp", - "Removed %s on topic %s with type %s to node /%s.", - entity_desc.c_str(), - topic_info.name_.c_str(), - topic_info.type_.c_str(), - graph_node.name_.c_str()); }; // Lock the graph mutex before accessing the graph. @@ -483,11 +456,6 @@ void GraphCache::parse_del(const std::string & keyexpr) remove_topics(graph_node->clients_, EntityType::Client, *this); } ns_it->second.erase(node_it); - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "Removed node /%s from the graph.", - entity.node_name().c_str() - ); return; } diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index b9c5a2ac..6add0f7c 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -61,7 +61,6 @@ struct TopicData using TopicDataPtr = std::shared_ptr; ///============================================================================= -// TODO(Yadunund): Expand to services and clients. struct GraphNode { std::string id_; @@ -82,7 +81,6 @@ struct GraphNode // Entires for service/client. TopicMap clients_ = {}; TopicMap services_ = {}; - }; using GraphNodePtr = std::shared_ptr; diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index f44a6e74..28e9d4fc 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -434,7 +434,7 @@ bool PublishToken::put( RCUTILS_SET_ERROR_MSG("invalid keyexpression generation for liveliness publication."); return false; } - RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "Sending PUT on %s", token.c_str()); + z_put_options_t options = z_put_options_default(); options.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL); if (z_put(z_loan(*session), z_keyexpr(token.c_str()), nullptr, 0, &options) < 0) { @@ -465,7 +465,7 @@ bool PublishToken::del( RCUTILS_SET_ERROR_MSG("invalid key-expression generation for liveliness publication."); return false; } - RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "Sending DELETE on %s", token.c_str()); + const z_delete_options_t options = z_delete_options_default(); if (z_delete(z_loan(*session), z_loan(keyexpr), &options) < 0) { RCUTILS_SET_ERROR_MSG("failed to delete liveliness key"); diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 72872e6a..7a76a2f4 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -14,7 +14,9 @@ #include +#include #include +#include #include #include "rcpputils/scope_exit.hpp" @@ -63,12 +65,12 @@ void sub_data_handler( sub_data->adapted_qos_profile.depth, z_loan(keystr)); - std::unique_ptr old = std::move(sub_data->message_queue.back()); + std::unique_ptr old = std::move(sub_data->message_queue.front()); z_drop(&old->payload); - sub_data->message_queue.pop_back(); + sub_data->message_queue.pop_front(); } - sub_data->message_queue.emplace_front( + sub_data->message_queue.emplace_back( std::make_unique( zc_sample_payload_rcinc(sample), sample->timestamp.time, sample->timestamp.id.id)); @@ -81,13 +83,24 @@ void sub_data_handler( } } +ZenohQuery::ZenohQuery(const z_query_t * query) +{ + query_ = z_query_clone(query); +} + +ZenohQuery::~ZenohQuery() +{ + z_drop(z_move(query_)); +} + +const z_query_t ZenohQuery::get_query() const +{ + return z_query_loan(&query_); +} + //============================================================================== void service_data_handler(const z_query_t * query, void * data) { - RCUTILS_LOG_INFO_NAMED( - "rmw_zenoh_cpp", - "[service_data_handler] triggered" - ); z_owned_str_t keystr = z_keyexpr_to_string(z_query_keyexpr(query)); auto drop_keystr = rcpputils::make_scope_exit( [&keystr]() { @@ -108,7 +121,7 @@ void service_data_handler(const z_query_t * query, void * data) // Get the query parameters and payload { std::lock_guard lock(service_data->query_queue_mutex); - service_data->query_queue.push_back(z_query_clone(query)); + service_data->query_queue.emplace_back(std::make_unique(query)); } { // Since we added new data, trigger the guard condition if it is available @@ -119,6 +132,31 @@ void service_data_handler(const z_query_t * query, void * data) } } +ZenohReply::ZenohReply(const z_owned_reply_t * reply) +{ + reply_ = *reply; +} + +ZenohReply::~ZenohReply() +{ + z_reply_drop(z_move(reply_)); +} + +std::optional ZenohReply::get_sample() const +{ + if (z_reply_is_ok(&reply_)) { + return z_reply_ok(&reply_); + } + + return std::nullopt; +} + +size_t rmw_client_data_t::get_next_sequence_number() +{ + std::lock_guard lock(sequence_number_mutex); + return sequence_number++; +} + //============================================================================== void client_data_handler(z_owned_reply_t * reply, void * data) { @@ -145,9 +183,9 @@ void client_data_handler(z_owned_reply_t * reply, void * data) return; } { - std::lock_guard msg_lock(client_data->message_mutex); + std::lock_guard msg_lock(client_data->replies_mutex); // Take ownership of the reply. - client_data->replies.emplace_back(*reply); + client_data->replies.emplace_back(std::make_unique(reply)); *reply = z_reply_null(); } { diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 462ef9f6..e8893eaf 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -147,10 +148,21 @@ void client_data_handler(z_owned_reply_t * reply, void * client_data); ///============================================================================== -struct rmw_service_data_t +class ZenohQuery final { - std::size_t get_new_uid(); +public: + ZenohQuery(const z_query_t * query); + + ~ZenohQuery(); + const z_query_t get_query() const; + +private: + z_owned_query_t query_; +}; + +struct rmw_service_data_t +{ z_owned_keyexpr_t keyexpr; z_owned_queryable_t qable; @@ -170,11 +182,11 @@ struct rmw_service_data_t rmw_context_t * context; // Deque to store the queries in the order they arrive. - std::deque query_queue; + std::deque> query_queue; std::mutex query_queue_mutex; // Map to store the sequence_number -> query_id - std::map sequence_to_query_map; + std::unordered_map> sequence_to_query_map; std::mutex sequence_to_query_map_mutex; std::mutex internal_mutex; @@ -183,6 +195,19 @@ struct rmw_service_data_t ///============================================================================== +class ZenohReply final +{ +public: + ZenohReply(const z_owned_reply_t * reply); + + ~ZenohReply(); + + std::optional get_sample() const; + +private: + z_owned_reply_t reply_; +}; + struct rmw_client_data_t { z_owned_keyexpr_t keyexpr; @@ -195,8 +220,8 @@ struct rmw_client_data_t // Liveliness token for the client. zc_owned_liveliness_token_t token; - std::mutex message_mutex; - std::deque replies; + std::mutex replies_mutex; + std::deque> replies; const void * request_type_support_impl; const void * response_type_support_impl; @@ -209,6 +234,10 @@ struct rmw_client_data_t std::mutex internal_mutex; std::condition_variable * condition{nullptr}; + uint8_t client_guid[RMW_GID_STORAGE_SIZE]; + + size_t get_next_sequence_number(); + std::mutex sequence_number_mutex; size_t sequence_number{1}; }; diff --git a/rmw_zenoh_cpp/src/detail/zenoh_config.cpp b/rmw_zenoh_cpp/src/detail/zenoh_config.cpp index dd799dfe..8f4e3ef0 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_config.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_config.cpp @@ -50,19 +50,12 @@ rmw_ret_t get_z_config(z_owned_config_t * config) if (zenoh_config_path[0] != '\0') { // If the environment variable is set, try to read the configuration from the file. *config = zc_config_from_file(zenoh_config_path); - RCUTILS_LOG_INFO_NAMED( - "ZenohConfiguration", - "Using zenoh configuration file pointed by '%s' envar: '%s'", kZenohConfigFileEnvVar, - zenoh_config_path); } else { // If the environment variable is not set use internal configuration static const std::string path_to_config_folder = ament_index_cpp::get_package_share_directory(rmw_zenoh_identifier) + "/config/"; const std::string default_zconfig_path = path_to_config_folder + kDefaultZenohConfigFileName; *config = zc_config_from_file(default_zconfig_path.c_str()); - RCUTILS_LOG_INFO_NAMED( - "ZenohConfiguration", - "Using default zenoh configuration file at '%s'", default_zconfig_path.c_str()); } // Verify that the configuration is valid. diff --git a/rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp b/rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp index 2f6ed398..353de39a 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp @@ -56,15 +56,12 @@ rmw_ret_t zenoh_router_check(z_session_t session) // Define callback auto callback = [](const struct z_id_t * id, void * ctx) { const std::string id_str = zid_to_str(*id); - RCUTILS_LOG_INFO_NAMED( - "ZenohRouterCheck", - "A Zenoh router connected to the session with id '%s'", id_str.c_str()); // Note: Callback is guaranteed to never be called // concurrently according to z_info_routers_zid docstring (*(static_cast(ctx)))++; }; - rmw_ret_t ret; + rmw_ret_t ret = RMW_RET_OK; z_owned_closure_zid_t router_callback = z_closure(callback, nullptr /* drop */, &context); if (z_info_routers_zid(session, z_move(router_callback))) { RCUTILS_LOG_ERROR_NAMED( @@ -77,11 +74,6 @@ rmw_ret_t zenoh_router_check(z_session_t session) "ZenohRouterCheck", "No Zenoh router connected to the session"); ret = RMW_RET_ERROR; - } else { - RCUTILS_LOG_INFO_NAMED( - "ZenohRouterCheck", - "There are %d Zenoh routers connected to the session", context); - ret = RMW_RET_OK; } } diff --git a/rmw_zenoh_cpp/src/rmw_event.cpp b/rmw_zenoh_cpp/src/rmw_event.cpp index d547e4c0..e7e44a4b 100644 --- a/rmw_zenoh_cpp/src/rmw_event.cpp +++ b/rmw_zenoh_cpp/src/rmw_event.cpp @@ -16,6 +16,7 @@ #include "rmw/error_handling.h" #include "rmw/event.h" +#include "rmw/types.h" #include "detail/identifier.hpp" diff --git a/rmw_zenoh_cpp/src/rmw_get_network_flow_endpoints.cpp b/rmw_zenoh_cpp/src/rmw_get_network_flow_endpoints.cpp index 381f3dbb..6ad3f52d 100644 --- a/rmw_zenoh_cpp/src/rmw_get_network_flow_endpoints.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_network_flow_endpoints.cpp @@ -12,7 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "rcutils/allocator.h" + #include "rmw/get_network_flow_endpoints.h" +#include "rmw/types.h" extern "C" { diff --git a/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp index 79e069ab..abcb6058 100644 --- a/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp @@ -16,8 +16,11 @@ #include "detail/liveliness_utils.hpp" #include "detail/rmw_data_types.hpp" +#include "rcutils/allocator.h" + #include "rmw/get_node_info_and_types.h" #include "rmw/impl/cpp/macros.hpp" +#include "rmw/types.h" extern "C" { diff --git a/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp index 9cf19aae..34f747f5 100644 --- a/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp @@ -14,8 +14,11 @@ #include "detail/rmw_data_types.hpp" +#include "rcutils/allocator.h" + #include "rmw/error_handling.h" #include "rmw/get_service_names_and_types.h" +#include "rmw/types.h" extern "C" { diff --git a/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp b/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp index 12795f41..d6f3b690 100644 --- a/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp @@ -17,8 +17,11 @@ #include "detail/liveliness_utils.hpp" #include "detail/rmw_data_types.hpp" +#include "rcutils/allocator.h" + #include "rmw/get_topic_endpoint_info.h" #include "rmw/impl/cpp/macros.hpp" +#include "rmw/types.h" extern "C" { diff --git a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp index c5218e1e..7e2f3d24 100644 --- a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp @@ -14,14 +14,16 @@ #include "detail/rmw_data_types.hpp" +#include "rcutils/allocator.h" + #include "rmw/error_handling.h" #include "rmw/get_topic_names_and_types.h" +#include "rmw/types.h" extern "C" { ///============================================================================== /// Return all topic names and types in the ROS graph. -// TODO(yadunund): Fix implementation once discovery information can be cached. rmw_ret_t rmw_get_topic_names_and_types( const rmw_node_t * node, diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 9636d206..fbfb9e52 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -15,6 +15,7 @@ #include #include +#include #include "detail/guard_condition.hpp" #include "detail/identifier.hpp" @@ -46,11 +47,6 @@ static void graph_sub_data_handler( { (void)data; z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "[graph_sub_data_handler] Received key '%s'", - z_loan(keystr) - ); // Get the context impl from data. rmw_context_impl_s * context_impl = static_cast( @@ -250,11 +246,7 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) const std::string liveliness_str = liveliness::subscription_token(context->actual_domain_id); // Query router/liveliness participants to get graph information before this session was started. - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "Sending Query '%s' to fetch discovery data...", - liveliness_str.c_str() - ); + // We create a non-blocking channel that is unbounded, ie. `bound` = 0, to receive // replies for the zc_liveliness_get() call. This is necessary as if the `bound` // is too low, the channel may starve the zenoh executor of its threads which @@ -280,9 +272,6 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) if (z_reply_is_ok(&reply)) { z_sample_t sample = z_reply_ok(&reply); z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); - printf( - ">> [discovery] Received ('%s': '%.*s')\n", z_loan(keystr), - static_cast(sample.payload.len), sample.payload.start); context->impl->graph_cache.parse_put(z_loan(keystr)); z_drop(z_move(keystr)); } else { @@ -293,11 +282,6 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) z_drop(z_move(channel)); // TODO(Yadunund): Switch this to a liveliness subscriptions once the API is available. - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "Setting up liveliness subscription on key: %s", - liveliness_str.c_str() - ); // Uncomment and rely on #if #endif blocks to enable this feature when building with // zenoh-pico since liveliness is only available in zenoh-c. @@ -356,8 +340,6 @@ rmw_shutdown(rmw_context_t * context) rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - context->impl->shm_manager = zc_shm_manager_null(); - z_drop(z_move(context->impl->shm_manager)); z_undeclare_subscriber(z_move(context->impl->graph_subscriber)); // Close the zenoh session if (z_close(z_move(context->impl->session)) < 0) { @@ -365,7 +347,19 @@ rmw_shutdown(rmw_context_t * context) return RMW_RET_ERROR; } + const rcutils_allocator_t * allocator = &context->options.allocator; + + z_drop(z_move(context->impl->shm_manager)); + + RMW_TRY_DESTRUCTOR( + static_cast(context->impl->graph_guard_condition->data)->~GuardCondition(), + GuardCondition, ); + allocator->deallocate(context->impl->graph_guard_condition->data, allocator->state); + + allocator->deallocate(context->impl->graph_guard_condition, allocator->state); + context->impl->is_shutdown = true; + return RMW_RET_OK; } @@ -389,16 +383,10 @@ rmw_context_fini(rmw_context_t * context) return RMW_RET_INVALID_ARGUMENT; } - const rcutils_allocator_t * allocator = &context->options.allocator; - - RMW_TRY_DESTRUCTOR( - static_cast(context->impl->graph_guard_condition->data)->~GuardCondition(), - GuardCondition, ); - allocator->deallocate(context->impl->graph_guard_condition->data, allocator->state); + RMW_TRY_DESTRUCTOR(context->impl->~rmw_context_impl_t(), rmw_context_impl_t, ); - allocator->deallocate(context->impl->graph_guard_condition, allocator->state); + const rcutils_allocator_t * allocator = &context->options.allocator; - RMW_TRY_DESTRUCTOR(context->impl->~rmw_context_impl_t(), rmw_context_impl_t, ); allocator->deallocate(context->impl, allocator->state); rmw_ret_t ret = rmw_init_options_fini(&context->options); diff --git a/rmw_zenoh_cpp/src/rmw_init_options.cpp b/rmw_zenoh_cpp/src/rmw_init_options.cpp index c34a6262..9eb146a4 100644 --- a/rmw_zenoh_cpp/src/rmw_init_options.cpp +++ b/rmw_zenoh_cpp/src/rmw_init_options.cpp @@ -17,7 +17,7 @@ #include "detail/identifier.hpp" #include "detail/rmw_init_options_impl.hpp" - +#include "rcutils/allocator.h" #include "rcutils/strdup.h" #include "rcutils/types.h" diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 2105bef0..b5bd5eac 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -19,9 +19,15 @@ #include #include +#include +#include #include #include +#include +#include #include +#include +#include #include "detail/guard_condition.hpp" #include "detail/graph_cache.hpp" @@ -455,12 +461,6 @@ rmw_create_publisher( return nullptr; } } - - RCUTILS_LOG_INFO_NAMED( - "rmw_zenoh_cpp", - "[rmw_create_publisher] %s", - topic_name); - RMW_CHECK_ARGUMENT_FOR_NULL(publisher_options, nullptr); if (publisher_options->require_unique_network_flow_endpoints == RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_STRICTLY_REQUIRED) @@ -523,11 +523,10 @@ rmw_create_publisher( }); RMW_TRY_PLACEMENT_NEW(publisher_data, publisher_data, return nullptr, rmw_publisher_data_t); - auto destruct_pub_data = rcpputils::make_scope_exit( + auto destruct_publisher_data = rcpputils::make_scope_exit( [publisher_data]() { RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - publisher_data->~rmw_publisher_data_t(), - rmw_publisher_data_t); + publisher_data->~rmw_publisher_data_t(), rmw_publisher_data_t); }); // Adapt any 'best available' QoS options @@ -694,9 +693,9 @@ rmw_create_publisher( undeclare_z_publisher_cache.cancel(); undeclare_z_publisher.cancel(); free_topic_name.cancel(); - destruct_pub_data.cancel(); destruct_msg_type_support.cancel(); free_type_support.cancel(); + destruct_publisher_data.cancel(); free_publisher_data.cancel(); free_rmw_publisher.cancel(); @@ -1209,11 +1208,6 @@ rmw_create_subscription( RMW_CHECK_ARGUMENT_FOR_NULL(qos_profile, nullptr); RMW_CHECK_ARGUMENT_FOR_NULL(subscription_options, nullptr); - RCUTILS_LOG_INFO_NAMED( - "rmw_zenoh_cpp", - "[rmw_create_subscription] %s", - topic_name); - const rosidl_message_type_support_t * type_support = find_message_type_support(type_supports); if (type_support == nullptr) { // error was already set by find_message_type_support @@ -1598,8 +1592,7 @@ static rmw_ret_t __rmw_take( const rmw_subscription_t * subscription, void * ros_message, bool * taken, - rmw_message_info_t * message_info, - rmw_subscription_allocation_t * allocation) + rmw_message_info_t * message_info) { RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(subscription->topic_name, RMW_RET_ERROR); @@ -1612,13 +1605,15 @@ static rmw_ret_t __rmw_take( subscription->implementation_identifier, rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - static_cast(allocation); - *taken = false; auto sub_data = static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); + if (sub_data->context->impl->is_shutdown) { + return RMW_RET_OK; + } + // RETRIEVE SERIALIZED MESSAGE =============================================== std::unique_ptr msg_data; @@ -1630,9 +1625,8 @@ static rmw_ret_t __rmw_take( return RMW_RET_OK; } - // NOTE(CH3): Potential place to handle "QoS" (e.g. could pop from back so it is LIFO) - msg_data = std::move(sub_data->message_queue.back()); - sub_data->message_queue.pop_back(); + msg_data = std::move(sub_data->message_queue.front()); + sub_data->message_queue.pop_front(); } // Object that manages the raw buffer @@ -1680,9 +1674,11 @@ rmw_take( bool * taken, rmw_subscription_allocation_t * allocation) { + static_cast(allocation); + rmw_message_info_t dummy_msg_info; - return __rmw_take(subscription, ros_message, taken, &dummy_msg_info, allocation); + return __rmw_take(subscription, ros_message, taken, &dummy_msg_info); } //============================================================================== @@ -1695,7 +1691,8 @@ rmw_take_with_info( rmw_message_info_t * message_info, rmw_subscription_allocation_t * allocation) { - return __rmw_take(subscription, ros_message, taken, message_info, allocation); + static_cast(allocation); + return __rmw_take(subscription, ros_message, taken, message_info); } //============================================================================== @@ -1798,6 +1795,18 @@ rmw_return_loaned_message_from_subscription( return RMW_RET_UNSUPPORTED; } +static void generate_random_guid(uint8_t guid[RMW_GID_STORAGE_SIZE]) +{ + std::random_device dev; + std::mt19937 rng(dev()); + std::uniform_int_distribution dist( + std::numeric_limits::min(), std::numeric_limits::max()); + + for (size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i) { + guid[i] = dist(rng); + } +} + //============================================================================== /// Create a service client that can send requests to and receive replies from a service server. rmw_client_t * @@ -1807,11 +1816,6 @@ rmw_create_client( const char * service_name, const rmw_qos_profile_t * qos_profile) { - RCUTILS_LOG_INFO_NAMED( - "rmw_zenoh_common_cpp", - "[rmw_create_client] %s with queue of depth %ld", - service_name, - qos_profile->depth); RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( @@ -1896,9 +1900,7 @@ rmw_create_client( rmw_client_data_t); }); - // Adapt any 'best available' QoS options - client_data->adapted_qos_profile = - rmw_dds_common::qos_profile_update_best_available_for_services(*qos_profile); + generate_random_guid(client_data->client_guid); // Obtain the type support const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); @@ -2040,11 +2042,11 @@ rmw_create_client( free_rmw_client.cancel(); free_client_data.cancel(); - free_request_type_support.cancel(); destruct_request_type_support.cancel(); + free_request_type_support.cancel(); + destruct_response_type_support.cancel(); free_response_type_support.cancel(); destruct_client_data.cancel(); - destruct_response_type_support.cancel(); free_service_name.cancel(); free_ros_keyexpr.cancel(); @@ -2082,14 +2084,18 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) // CLEANUP =================================================================== z_drop(z_move(client_data->zn_closure_reply)); z_drop(z_move(client_data->keyexpr)); - for (z_owned_reply_t & reply : client_data->replies) { - z_reply_drop(&reply); - } client_data->replies.clear(); z_drop(z_move(client_data->token)); + RMW_TRY_DESTRUCTOR( + client_data->request_type_support->~RequestTypeSupport(), RequestTypeSupport, ); allocator->deallocate(client_data->request_type_support, allocator->state); + + RMW_TRY_DESTRUCTOR( + client_data->response_type_support->~ResponseTypeSupport(), ResponseTypeSupport, ); allocator->deallocate(client_data->response_type_support, allocator->state); + RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, ); + allocator->deallocate(client->data, allocator->state); allocator->deallocate(const_cast(client->service_name), allocator->state); @@ -2098,23 +2104,48 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) return RMW_RET_OK; } -static z_owned_bytes_map_t create_map_and_set_sequence_num(int64_t sequence_number) +static z_owned_bytes_map_t create_map_and_set_sequence_num( + int64_t sequence_number, uint8_t guid[RMW_GID_STORAGE_SIZE]) { z_owned_bytes_map_t map = z_bytes_map_new(); if (!z_check(map)) { RMW_SET_ERROR_MSG("failed to allocate map for sequence number"); return z_bytes_map_null(); } + auto free_attachment_map = rcpputils::make_scope_exit( + [&map]() { + z_bytes_map_drop(z_move(map)); + }); // The largest possible int64_t number is INT64_MAX, i.e. 9223372036854775807. // That is 19 characters long, plus one for the trailing \0, means we need 20 bytes. char seq_id_str[20]; - if (rcutils_snprintf(seq_id_str, 20, "%" PRId64, sequence_number) < 0) { + if (rcutils_snprintf(seq_id_str, sizeof(seq_id_str), "%" PRId64, sequence_number) < 0) { RMW_SET_ERROR_MSG("failed to print sequence_number into buffer"); return z_bytes_map_null(); } z_bytes_map_insert_by_copy(&map, z_bytes_new("sequence_number"), z_bytes_new(seq_id_str)); + auto now = std::chrono::system_clock::now().time_since_epoch(); + auto now_ns = std::chrono::duration_cast(now); + char source_ts_str[20]; + if (rcutils_snprintf(source_ts_str, sizeof(source_ts_str), "%" PRId64, now_ns.count()) < 0) { + RMW_SET_ERROR_MSG("failed to print sequence_number into buffer"); + return z_bytes_map_null(); + } + z_bytes_map_insert_by_copy(&map, z_bytes_new("source_timestamp"), z_bytes_new(source_ts_str)); + + z_bytes_t guid_bytes; + guid_bytes.len = RMW_GID_STORAGE_SIZE; + guid_bytes.start = static_cast(malloc(RMW_GID_STORAGE_SIZE)); + memcpy(static_cast(const_cast(guid_bytes.start)), guid, RMW_GID_STORAGE_SIZE); + + z_bytes_map_insert_by_copy(&map, z_bytes_new("client_guid"), guid_bytes); + + free(const_cast(guid_bytes.start)); + + free_attachment_map.cancel(); + return map; } @@ -2126,8 +2157,6 @@ rmw_send_request( const void * ros_request, int64_t * sequence_id) { - RCUTILS_LOG_INFO_NAMED( - "rmw_zenoh_cpp", "[rmw_send_request]"); RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(ros_request, RMW_RET_INVALID_ARGUMENT); @@ -2186,13 +2215,12 @@ rmw_send_request( size_t data_length = ser.getSerializedDataLength(); - // TODO(clalancette): Locking for multiple requests at the same time - *sequence_id = client_data->sequence_number++; + *sequence_id = client_data->get_next_sequence_number(); // Send request z_get_options_t opts = z_get_options_default(); - z_owned_bytes_map_t map = create_map_and_set_sequence_num(*sequence_id); + z_owned_bytes_map_t map = create_map_and_set_sequence_num(*sequence_id, client_data->client_guid); if (!z_check(map)) { // create_map_and_set_sequence_num already set the error return RMW_RET_ERROR; @@ -2205,9 +2233,10 @@ rmw_send_request( opts.attachment = z_bytes_map_as_attachment(&map); opts.target = Z_QUERY_TARGET_ALL_COMPLETE; - // Latest consolidation guarantees unicity of replies for the same key expression. It optimizes bandwidth. - // Default is None which imples replies may come in any order and any number. - opts.consolidation = z_query_consolidation_default(); + // Latest consolidation guarantees unicity of replies for the same key expression, + // which optimizes bandwidth. The default is "None", which imples replies may come in any order + // and any number. + opts.consolidation = z_query_consolidation_latest(); opts.value.payload = z_bytes_t{data_length, reinterpret_cast(request_bytes)}; opts.value.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL); client_data->zn_closure_reply = z_closure(client_data_handler, nullptr, client_data); @@ -2218,63 +2247,86 @@ rmw_send_request( return RMW_RET_OK; } -static int64_t get_sequence_num_from_attachment(const z_attachment_t * const attachment) +static int64_t get_int64_from_attachment( + const z_attachment_t * const attachment, const std::string & name) { - // Get the sequence_number out of the attachment if (!z_check(*attachment)) { // A valid request must have had an attachment RMW_SET_ERROR_MSG("Could not get attachment from query"); return -1; } - z_bytes_t sequence_num_index = z_attachment_get(*attachment, z_bytes_new("sequence_number")); - if (!z_check(sequence_num_index)) { - // A valid request must have had a sequence number attached - RMW_SET_ERROR_MSG("Could not get sequence number from query"); + z_bytes_t index = z_attachment_get(*attachment, z_bytes_new(name.c_str())); + if (!z_check(index)) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("Could not get %s from attachment", name.c_str()); return -1; } - if (sequence_num_index.len < 1) { - RMW_SET_ERROR_MSG("No value specified for the sequence number"); + if (index.len < 1) { + RMW_SET_ERROR_MSG("no value specified"); return -1; } - if (sequence_num_index.len > 19) { - // The sequence number was larger than we expected - RMW_SET_ERROR_MSG("Sequence number too large"); + if (index.len > 19) { + // The number was larger than we expected + RMW_SET_ERROR_MSG("number too large"); return -1; } // The largest possible int64_t number is INT64_MAX, i.e. 9223372036854775807. // That is 19 characters long, plus one for the trailing \0, means we need 20 bytes. - char sequence_num_str[20]; + char int64_str[20]; - memcpy(sequence_num_str, sequence_num_index.start, sequence_num_index.len); - sequence_num_str[sequence_num_index.len] = '\0'; + memcpy(int64_str, index.start, index.len); + int64_str[index.len] = '\0'; errno = 0; char * endptr; - int64_t seqnum = strtol(sequence_num_str, &endptr, 10); - if (seqnum == 0) { + int64_t num = strtol(int64_str, &endptr, 10); + if (num == 0) { // This is an error regardless; the client should never send this - RMW_SET_ERROR_MSG("A invalid zero value sent as the sequence number"); + RMW_SET_ERROR_MSG("a invalid zero value sent"); return -1; - } else if (endptr == sequence_num_str) { + } else if (endptr == int64_str) { // No values were converted, this is an error - RMW_SET_ERROR_MSG("No valid numbers available in the sequence number"); + RMW_SET_ERROR_MSG("no valid numbers available"); return -1; } else if (*endptr != '\0') { // There was junk after the number - RMW_SET_ERROR_MSG("Non-numeric values in the sequence number"); + RMW_SET_ERROR_MSG("non-numeric values"); return -1; } else if (errno != 0) { // Some other error occurred, which may include overflow or underflow RMW_SET_ERROR_MSG( - "An undefined error occurred while getting the sequence number, this may be an overflow"); + "an undefined error occurred while getting the number, this may be an overflow"); return -1; } - return seqnum; + return num; +} + +static bool get_client_guid_from_attachment( + const z_attachment_t * const attachment, uint8_t guid[RMW_GID_STORAGE_SIZE]) +{ + if (!z_check(*attachment)) { + RMW_SET_ERROR_MSG("Could not get client_guid from attachment"); + return false; + } + + z_bytes_t index = z_attachment_get(*attachment, z_bytes_new("client_guid")); + if (!z_check(index)) { + RMW_SET_ERROR_MSG("Could not get client_guid from attachment"); + return false; + } + + if (index.len != RMW_GID_STORAGE_SIZE) { + RMW_SET_ERROR_MSG("Invalid size for GUID storage"); + return false; + } + + memcpy(guid, index.start, index.len); + + return true; } //============================================================================== @@ -2287,7 +2339,6 @@ rmw_take_response( bool * taken) { *taken = false; - RCUTILS_LOG_INFO_NAMED("rmw_zenoh_cpp", "[rmw_take_response]"); RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(client->data, RMW_RET_INVALID_ARGUMENT); @@ -2305,20 +2356,26 @@ rmw_take_response( RMW_CHECK_FOR_NULL_WITH_MSG( client->data, "Unable to retrieve client_data from client.", RMW_RET_INVALID_ARGUMENT); - z_owned_reply_t * latest_reply = nullptr; - - std::lock_guard lock(client_data->message_mutex); - if (client_data->replies.empty()) { - RCUTILS_LOG_ERROR_NAMED("rmw_zenoh_cpp", "[rmw_take_response] Response message is empty"); + std::unique_ptr latest_reply = nullptr; + { + std::lock_guard lock(client_data->replies_mutex); + if (client_data->replies.empty()) { + RCUTILS_LOG_ERROR_NAMED("rmw_zenoh_cpp", "[rmw_take_response] Response message is empty"); + return RMW_RET_ERROR; + } + latest_reply = std::move(client_data->replies.front()); + client_data->replies.pop_front(); + } + std::optional sample = latest_reply->get_sample(); + if (!sample) { + RMW_SET_ERROR_MSG("invalid reply sample"); return RMW_RET_ERROR; } - latest_reply = &client_data->replies.front(); - z_sample_t sample = z_reply_ok(latest_reply); // Object that manages the raw buffer eprosima::fastcdr::FastBuffer fastbuffer( - reinterpret_cast(const_cast(sample.payload.start)), - sample.payload.len); + reinterpret_cast(const_cast(sample->payload.start)), + sample->payload.len); // Object that serializes the data eprosima::fastcdr::Cdr deser( @@ -2334,18 +2391,34 @@ rmw_take_response( return RMW_RET_ERROR; } - request_header->request_id.sequence_number = get_sequence_num_from_attachment(&sample.attachment); + // Fill in the request_header + + request_header->request_id.sequence_number = + get_int64_from_attachment(&sample->attachment, "sequence_number"); if (request_header->request_id.sequence_number < 0) { - // get_sequence_num_from_attachment already set an error + // get_int64_from_attachment already set an error return RMW_RET_ERROR; } - // TODO(clalancette): We also need to fill in the source_timestamp, received_timestamp, - // and writer_guid - *taken = true; + request_header->source_timestamp = + get_int64_from_attachment(&sample->attachment, "source_timestamp"); + if (request_header->source_timestamp < 0) { + // get_int64_from_attachment already set an error + return RMW_RET_ERROR; + } - client_data->replies.pop_front(); - z_reply_drop(latest_reply); + if (!get_client_guid_from_attachment( + &sample->attachment, request_header->request_id.writer_guid)) + { + // get_client_guid_from_attachment already set an error + return RMW_RET_ERROR; + } + + auto now = std::chrono::system_clock::now().time_since_epoch(); + auto now_ns = std::chrono::duration_cast(now); + request_header->received_timestamp = now_ns.count(); + + *taken = true; return RMW_RET_OK; } @@ -2392,11 +2465,6 @@ rmw_create_service( const char * service_name, const rmw_qos_profile_t * qos_profiles) { - RCUTILS_LOG_INFO_NAMED( - "rmw_zenoh_cpp", - "[rmw_create_service] %s", - service_name); - // ASSERTIONS ================================================================ RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( @@ -2684,13 +2752,19 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) // CLEANUP ================================================================ z_drop(z_move(service_data->keyexpr)); z_drop(z_move(service_data->qable)); - for (z_owned_query_t & query : service_data->query_queue) { - z_drop(z_move(query)); - } + service_data->sequence_to_query_map.clear(); + service_data->query_queue.clear(); z_drop(z_move(service_data->token)); + RMW_TRY_DESTRUCTOR( + service_data->request_type_support->~RequestTypeSupport(), RequestTypeSupport, ); allocator->deallocate(service_data->request_type_support, allocator->state); + + RMW_TRY_DESTRUCTOR( + service_data->response_type_support->~ResponseTypeSupport(), ResponseTypeSupport, ); allocator->deallocate(service_data->response_type_support, allocator->state); + + RMW_TRY_DESTRUCTOR(service_data->~rmw_service_data_t(), rmw_service_data_t, ); allocator->deallocate(service->data, allocator->state); allocator->deallocate(const_cast(service->service_name), allocator->state); @@ -2709,7 +2783,6 @@ rmw_take_request( bool * taken) { *taken = false; - RCUTILS_LOG_INFO_NAMED("rmw_zenoh_cpp", "[rmw_take_request]"); RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(service->data, RMW_RET_INVALID_ARGUMENT); @@ -2729,37 +2802,17 @@ rmw_take_request( RMW_CHECK_FOR_NULL_WITH_MSG( service->data, "Unable to retrieve service_data from service", RMW_RET_INVALID_ARGUMENT); - z_owned_query_t query; + std::unique_ptr query = nullptr; { std::lock_guard lock(service_data->query_queue_mutex); if (service_data->query_queue.empty()) { return RMW_RET_OK; } - query = service_data->query_queue.front(); + query = std::move(service_data->query_queue.front()); + service_data->query_queue.pop_front(); } - const z_query_t loaned_query = z_query_loan(&query); - - // Get the sequence_number out of the attachment - z_attachment_t attachment = z_query_attachment(&loaned_query); - - int64_t sequence_number = get_sequence_num_from_attachment(&attachment); - if (sequence_number < 0) { - // get_sequence_number_from_attachment already set the error - return RMW_RET_ERROR; - } - - // Add this query to the map, so that rmw_send_response can quickly look it up later - { - std::lock_guard lock(service_data->sequence_to_query_map_mutex); - if (service_data->sequence_to_query_map.find(sequence_number) != - service_data->sequence_to_query_map.end()) - { - RMW_SET_ERROR_MSG("duplicate sequence number in the map"); - return RMW_RET_ERROR; - } - service_data->sequence_to_query_map.emplace(std::pair(sequence_number, query)); - } + const z_query_t loaned_query = query->get_query(); // DESERIALIZE MESSAGE ======================================================== z_value_t payload_value = z_query_value(&loaned_query); @@ -2784,11 +2837,44 @@ rmw_take_request( } // Fill in the request header. - // TODO(clalancette): We also need to fill in writer_guid, source_timestamp, - // and received_timestamp - request_header->request_id.sequence_number = sequence_number; - service_data->query_queue.pop_front(); + // Get the sequence_number out of the attachment + z_attachment_t attachment = z_query_attachment(&loaned_query); + + request_header->request_id.sequence_number = + get_int64_from_attachment(&attachment, "sequence_number"); + if (request_header->request_id.sequence_number < 0) { + // get_int64_from_attachment already set the error + return RMW_RET_ERROR; + } + + request_header->source_timestamp = get_int64_from_attachment(&attachment, "source_timestamp"); + if (request_header->source_timestamp < 0) { + // get_int64_from_attachment already set the error + return RMW_RET_ERROR; + } + + if (!get_client_guid_from_attachment(&attachment, request_header->request_id.writer_guid)) { + // get_client_guid_from_attachment already set an error + return RMW_RET_ERROR; + } + + auto now = std::chrono::system_clock::now().time_since_epoch(); + auto now_ns = std::chrono::duration_cast(now); + request_header->received_timestamp = now_ns.count(); + + // Add this query to the map, so that rmw_send_response can quickly look it up later + { + std::lock_guard lock(service_data->sequence_to_query_map_mutex); + if (service_data->sequence_to_query_map.find(request_header->request_id.sequence_number) != + service_data->sequence_to_query_map.end()) + { + RMW_SET_ERROR_MSG("duplicate sequence number in the map"); + return RMW_RET_ERROR; + } + service_data->sequence_to_query_map.emplace( + std::pair(request_header->request_id.sequence_number, std::move(query))); + } *taken = true; @@ -2803,9 +2889,6 @@ rmw_send_response( rmw_request_id_t * request_header, void * ros_response) { - RCUTILS_LOG_INFO_NAMED( - "rmw_zenoh_cpp", "[rmw_send_response]"); - RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(service->data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(request_header, RMW_RET_INVALID_ARGUMENT); @@ -2868,11 +2951,11 @@ rmw_send_response( RMW_SET_ERROR_MSG("Unable to find taken request. Report this bug."); return RMW_RET_ERROR; } - const z_query_t loaned_query = z_query_loan(&query_it->second); + const z_query_t loaned_query = query_it->second->get_query(); z_query_reply_options_t options = z_query_reply_options_default(); - // TODO(clalancette): We also need to fill in and send the writer_guid - z_owned_bytes_map_t map = create_map_and_set_sequence_num(request_header->sequence_number); + z_owned_bytes_map_t map = create_map_and_set_sequence_num( + request_header->sequence_number, request_header->writer_guid); if (!z_check(map)) { // create_map_and_set_sequence_num already set the error return RMW_RET_ERROR; @@ -2889,7 +2972,6 @@ rmw_send_response( &loaned_query, z_loan(service_data->keyexpr), reinterpret_cast( response_bytes), data_length, &options); - z_drop(z_move(query_it->second)); service_data->sequence_to_query_map.erase(query_it); return RMW_RET_OK; } @@ -3089,107 +3171,173 @@ rmw_destroy_wait_set(rmw_wait_set_t * wait_set) return RMW_RET_OK; } -//============================================================================== -/// Waits on sets of different entities and returns when one is ready. -rmw_ret_t -rmw_wait( +static bool has_triggered_condition( rmw_subscriptions_t * subscriptions, rmw_guard_conditions_t * guard_conditions, rmw_services_t * services, rmw_clients_t * clients, - rmw_events_t * events, - rmw_wait_set_t * wait_set, - const rmw_time_t * wait_timeout) + rmw_events_t * events) { - RMW_CHECK_ARGUMENT_FOR_NULL(wait_set, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_TYPE_IDENTIFIERS_MATCH( - wait set handle, - wait_set->implementation_identifier, rmw_zenoh_identifier, - return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - - // TODO(yadunund): Switch to debug log level. - RCUTILS_LOG_DEBUG_NAMED( - "rmw_zenoh_cpp", - "[rmw_wait] %ld subscriptions, %ld services, %ld clients, %ld events, %ld guard conditions", - subscriptions->subscriber_count, - services->service_count, - clients->client_count, - events->event_count, - guard_conditions->guard_condition_count); - - // TODO(yadunund): Switch to debug log level. - if (wait_timeout) { - RCUTILS_LOG_DEBUG_NAMED( - "rmw_zenoh_common_cpp", "[rmw_wait] TIMEOUT: %ld s %ld ns", - wait_timeout->sec, - wait_timeout->nsec); - } - - auto wait_set_data = static_cast(wait_set->data); - RMW_CHECK_FOR_NULL_WITH_MSG( - wait_set_data, - "waitset data struct is null", - return RMW_RET_ERROR); + static_cast(events); if (guard_conditions) { - // Go through each of the guard conditions, and attach the wait set condition variable to them. - // That way they can wake it up if they are triggered while we are waiting. for (size_t i = 0; i < guard_conditions->guard_condition_count; ++i) { - // This is hard to track down, but each of the (void *) pointers in - // guard_conditions->guard_conditions points to the data field of the related - // rmw_guard_condition_t. So we can directly cast it to GuardCondition. GuardCondition * gc = static_cast(guard_conditions->guard_conditions[i]); - if (gc != nullptr) { - gc->attach_condition(&wait_set_data->condition_variable); + if (gc != nullptr && gc->has_triggered()) { + return true; } } } + // TODO(clalancette): Deal with events + if (subscriptions) { - // Go through each of the subscriptions and attach the wait set condition variable to them. - // That way they can wake it up if they are triggered while we are waiting. for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { auto sub_data = static_cast(subscriptions->subscribers[i]); if (sub_data != nullptr) { - sub_data->condition = &wait_set_data->condition_variable; + std::lock_guard internal_lock(sub_data->internal_mutex); + if (!sub_data->message_queue.empty()) { + return true; + } } } } - if (services) { - // Go through each of the services and attach the wait set condition variable to them. - // That way they can wake it up if they are triggered while we are waiting. for (size_t i = 0; i < services->service_count; ++i) { auto serv_data = static_cast(services->services[i]); if (serv_data != nullptr) { - serv_data->condition = &wait_set_data->condition_variable; + std::lock_guard internal_lock(serv_data->internal_mutex); + if (!serv_data->query_queue.empty()) { + return true; + } } } } if (clients) { - // Go through each of the clients and attach the wait set condition variable to them. - // That way they can wake it up if they are triggered while we are waiting. for (size_t i = 0; i < clients->client_count; ++i) { rmw_client_data_t * client_data = static_cast(clients->clients[i]); if (client_data != nullptr) { - client_data->condition = &wait_set_data->condition_variable; + std::lock_guard internal_lock(client_data->internal_mutex); + if (!client_data->replies.empty()) { + return true; + } } } } - std::unique_lock lock(wait_set_data->condition_mutex); + return false; +} - // According to the RMW documentation, if wait_timeout is NULL that means - // "wait forever", if it specified by 0 it means "never wait", and if it is anything else wait - // for that amount of time. - if (wait_timeout == nullptr) { - wait_set_data->condition_variable.wait(lock); - } else { - if (wait_timeout->sec != 0 || wait_timeout->nsec != 0) { - wait_set_data->condition_variable.wait_for( - lock, std::chrono::nanoseconds(wait_timeout->nsec + RCUTILS_S_TO_NS(wait_timeout->sec))); +//============================================================================== +/// Waits on sets of different entities and returns when one is ready. +rmw_ret_t +rmw_wait( + rmw_subscriptions_t * subscriptions, + rmw_guard_conditions_t * guard_conditions, + rmw_services_t * services, + rmw_clients_t * clients, + rmw_events_t * events, + rmw_wait_set_t * wait_set, + const rmw_time_t * wait_timeout) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(wait_set, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + wait set handle, + wait_set->implementation_identifier, rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + + auto wait_set_data = static_cast(wait_set->data); + RMW_CHECK_FOR_NULL_WITH_MSG( + wait_set_data, + "waitset data struct is null", + return RMW_RET_ERROR); + + // rmw_wait should return *all* entities that have data available, and let the caller decide + // how to handle them. + // + // If there is no data currently available in any of the entities we were told to wait on, we + // we attach a context-global condition variable to each entity, calculate a timeout based on + // wait_timeout, and then sleep on the condition variable. If any of the entities has an event + // during that time, it will wake up from that sleep. + // + // If there is data currently available in one or more of the entities, then we'll skip attaching + // the condition variable, and skip the sleep, and instead just go to the last part. + // + // In the last part, we check every entity and see if there are conditions that make it ready. + // If that entity is not ready, then we set the pointer to it to nullptr in the wait set, which + // signals to the upper layers that it isn't ready. If something is ready, then we leave it as + // a valid pointer. + + bool skip_wait = has_triggered_condition( + subscriptions, guard_conditions, services, clients, events); + bool wait_result = true; + + if (!skip_wait) { + if (guard_conditions) { + // Attach the wait set condition variable to each guard condition. + // That way they can wake it up if they are triggered while we are waiting. + for (size_t i = 0; i < guard_conditions->guard_condition_count; ++i) { + // This is hard to track down, but each of the (void *) pointers in + // guard_conditions->guard_conditions points to the data field of the related + // rmw_guard_condition_t. So we can directly cast it to GuardCondition. + GuardCondition * gc = static_cast(guard_conditions->guard_conditions[i]); + if (gc != nullptr) { + gc->attach_condition(&wait_set_data->condition_variable); + } + } + } + + if (subscriptions) { + // Attach the wait set condition variable to each subscription. + // That way they can wake it up if they are triggered while we are waiting. + for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { + auto sub_data = static_cast(subscriptions->subscribers[i]); + if (sub_data != nullptr) { + std::lock_guard internal_lock(sub_data->internal_mutex); + sub_data->condition = &wait_set_data->condition_variable; + } + } + } + + if (services) { + // Attach the wait set condition variable to each service. + // That way they can wake it up if they are triggered while we are waiting. + for (size_t i = 0; i < services->service_count; ++i) { + auto serv_data = static_cast(services->services[i]); + if (serv_data != nullptr) { + std::lock_guard internal_lock(serv_data->internal_mutex); + serv_data->condition = &wait_set_data->condition_variable; + } + } + } + + if (clients) { + // Attach the wait set condition variable to each client. + // That way they can wake it up if they are triggered while we are waiting. + for (size_t i = 0; i < clients->client_count; ++i) { + rmw_client_data_t * client_data = static_cast(clients->clients[i]); + if (client_data != nullptr) { + std::lock_guard internal_lock(client_data->internal_mutex); + client_data->condition = &wait_set_data->condition_variable; + } + } + } + + std::unique_lock lock(wait_set_data->condition_mutex); + + // According to the RMW documentation, if wait_timeout is NULL that means + // "wait forever", if it specified by 0 it means "never wait", and if it is anything else wait + // for that amount of time. + if (wait_timeout == nullptr) { + wait_set_data->condition_variable.wait(lock); + } else { + if (wait_timeout->sec != 0 || wait_timeout->nsec != 0) { + std::cv_status wait_status = wait_set_data->condition_variable.wait_for( + lock, std::chrono::nanoseconds(wait_timeout->nsec + RCUTILS_S_TO_NS(wait_timeout->sec))); + wait_result = wait_status == std::cv_status::no_timeout; + } } } @@ -3213,6 +3361,7 @@ rmw_wait( for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { auto sub_data = static_cast(subscriptions->subscribers[i]); if (sub_data != nullptr) { + std::lock_guard internal_lock(sub_data->internal_mutex); sub_data->condition = nullptr; // According to the documentation for rmw_wait in rmw.h, entries in the // array that have *not* been triggered should be set to NULL @@ -3229,6 +3378,7 @@ rmw_wait( for (size_t i = 0; i < services->service_count; ++i) { auto serv_data = static_cast(services->services[i]); if (serv_data != nullptr) { + std::lock_guard internal_lock(serv_data->internal_mutex); serv_data->condition = nullptr; if (serv_data->query_queue.empty()) { // Setting to nullptr lets rcl know that this service is not ready @@ -3243,6 +3393,7 @@ rmw_wait( for (size_t i = 0; i < clients->client_count; ++i) { rmw_client_data_t * client_data = static_cast(clients->clients[i]); if (client_data != nullptr) { + std::lock_guard internal_lock(client_data->internal_mutex); client_data->condition = nullptr; // According to the documentation for rmw_wait in rmw.h, entries in the // array that have *not* been triggered should be set to NULL @@ -3254,8 +3405,7 @@ rmw_wait( } } - - return RMW_RET_OK; + return (skip_wait || wait_result) ? RMW_RET_OK : RMW_RET_TIMEOUT; } //==============================================================================