diff --git a/rmw_zenoh_cpp/src/detail/logging.cpp b/rmw_zenoh_cpp/src/detail/logging.cpp index b9ed8cbe..1977b0f2 100644 --- a/rmw_zenoh_cpp/src/detail/logging.cpp +++ b/rmw_zenoh_cpp/src/detail/logging.cpp @@ -36,6 +36,9 @@ void Logger::set_log_level(RCUTILS_LOG_SEVERITY new_level) void Logger::log_named( RCUTILS_LOG_SEVERITY level, + const char * function_name, + const char * file_name, + size_t line_number, const char * name, const char * message, ...) const @@ -47,7 +50,7 @@ void Logger::log_named( RCUTILS_SAFE_FWRITE_TO_STDERR("Failed to get timestamp while doing a console logging.\n"); return; } - static rcutils_log_location_t log_location = {__func__, __FILE__, __LINE__}; + static rcutils_log_location_t log_location = {function_name, file_name, line_number}; va_list args; va_start(args, message); rcutils_logging_console_output_handler( diff --git a/rmw_zenoh_cpp/src/detail/logging.hpp b/rmw_zenoh_cpp/src/detail/logging.hpp index 171d5694..35ca11ed 100644 --- a/rmw_zenoh_cpp/src/detail/logging.hpp +++ b/rmw_zenoh_cpp/src/detail/logging.hpp @@ -32,6 +32,9 @@ class Logger // Log to the console. void log_named( RCUTILS_LOG_SEVERITY level, + const char * function_name, + const char * file_name, + size_t line_number, const char * name, const char * message, ...) const; diff --git a/rmw_zenoh_cpp/src/detail/logging_macros.hpp b/rmw_zenoh_cpp/src/detail/logging_macros.hpp index 1e6ac74f..81201856 100644 --- a/rmw_zenoh_cpp/src/detail/logging_macros.hpp +++ b/rmw_zenoh_cpp/src/detail/logging_macros.hpp @@ -24,14 +24,14 @@ // invoke GraphCache::parse_put() and GraphCache::parse_del() functions. // See https://github.com/ros2/rmw_zenoh/issues/182 for more details. #define RMW_ZENOH_LOG_DEBUG_NAMED(...) {rmw_zenoh_cpp::Logger::get().log_named( \ - RCUTILS_LOG_SEVERITY_DEBUG, __VA_ARGS__);} + RCUTILS_LOG_SEVERITY_DEBUG, __func__, __FILE__, __LINE__, __VA_ARGS__);} #define RMW_ZENOH_LOG_ERROR_NAMED(...) {rmw_zenoh_cpp::Logger::get().log_named( \ - RCUTILS_LOG_SEVERITY_ERROR, __VA_ARGS__);} + RCUTILS_LOG_SEVERITY_ERROR, __func__, __FILE__, __LINE__, __VA_ARGS__);} #define RMW_ZENOH_LOG_FATAL_NAMED(...) {rmw_zenoh_cpp::Logger::get().log_named( \ - RCUTILS_LOG_SEVERITY_FATAL, __VA_ARGS__);} + RCUTILS_LOG_SEVERITY_FATAL, __func__, __FILE__, __LINE__, __VA_ARGS__);} #define RMW_ZENOH_LOG_INFO_NAMED(...) {rmw_zenoh_cpp::Logger::get().log_named( \ - RCUTILS_LOG_SEVERITY_INFO, __VA_ARGS__);} + RCUTILS_LOG_SEVERITY_INFO, __func__, __FILE__, __LINE__, __VA_ARGS__);} #define RMW_ZENOH_LOG_WARN_NAMED(...) {rmw_zenoh_cpp::Logger::get().log_named( \ - RCUTILS_LOG_SEVERITY_WARN, __VA_ARGS__);} + RCUTILS_LOG_SEVERITY_WARN, __func__, __FILE__, __LINE__, __VA_ARGS__);} #endif // DETAIL__LOGGING_MACROS_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index eae1fdfc..51f84b33 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -28,6 +28,7 @@ #include "rcpputils/scope_exit.hpp" #include "rmw/error_handling.h" +#include "rmw/impl/cpp/macros.hpp" #include "attachment_helpers.hpp" #include "rmw_data_types.hpp" @@ -393,6 +394,42 @@ std::unique_ptr rmw_client_data_t::pop_next_reply() return latest_reply; } +//============================================================================== +// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class +// for the use of this method. +void rmw_client_data_t::increment_in_flight_callbacks() +{ + std::lock_guard lock(in_flight_mutex_); + num_in_flight_++; +} + +//============================================================================== +// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class +// for the use of this method. +bool rmw_client_data_t::shutdown_and_query_in_flight() +{ + std::lock_guard lock(in_flight_mutex_); + is_shutdown_ = true; + + return num_in_flight_ > 0; +} + +//============================================================================== +// See the comment about the "num_in_flight" class variable in the rmw_client_data_t structure +// for the use of this method. +bool rmw_client_data_t::decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight) +{ + std::lock_guard lock(in_flight_mutex_); + queries_in_flight = --num_in_flight_ > 0; + return is_shutdown_; +} + +bool rmw_client_data_t::is_shutdown() const +{ + std::lock_guard lock(in_flight_mutex_); + return is_shutdown_; +} + //============================================================================== void sub_data_handler( const z_sample_t * sample, @@ -531,6 +568,13 @@ void client_data_handler(z_owned_reply_t * reply, void * data) ); return; } + + // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for + // why we need to do this. + if (client_data->is_shutdown()) { + return; + } + if (!z_reply_check(reply)) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -556,4 +600,30 @@ void client_data_handler(z_owned_reply_t * reply, void * data) // Since we took ownership of the reply, null it out here *reply = z_reply_null(); } + +void client_data_drop(void * data) +{ + auto client_data = static_cast(data); + if (client_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain client_data_t " + ); + return; + } + + // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for + // why we need to do this. + bool queries_in_flight = false; + bool is_shutdown = client_data->decrement_queries_in_flight_and_is_shutdown(queries_in_flight); + + if (is_shutdown) { + if (!queries_in_flight) { + RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, ); + client_data->context->options.allocator.deallocate( + client_data, client_data->context->options.allocator.state); + } + } +} + } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index f10e84f3..60e4bd01 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -204,6 +204,7 @@ void service_data_handler(const z_query_t * query, void * service_data); ///============================================================================= void client_data_handler(z_owned_reply_t * reply, void * client_data); +void client_data_drop(void * data); ///============================================================================= class ZenohQuery final @@ -297,7 +298,6 @@ class rmw_client_data_t final std::shared_ptr entity; z_owned_keyexpr_t keyexpr; - z_owned_closure_reply_t zn_closure_reply; // Store the actual QoS profile used to configure this client. // The QoS is reused for sending requests and getting responses. @@ -329,6 +329,17 @@ class rmw_client_data_t final DataCallbackManager data_callback_mgr; + // See the comment for "num_in_flight" below on the use of this method. + void increment_in_flight_callbacks(); + + // See the comment for "num_in_flight" below on the use of this method. + bool shutdown_and_query_in_flight(); + + // See the comment for "num_in_flight" below on the use of this method. + bool decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight); + + bool is_shutdown() const; + private: void notify(); @@ -340,6 +351,28 @@ class rmw_client_data_t final std::deque> reply_queue_; mutable std::mutex reply_queue_mutex_; + + // rmw_zenoh uses Zenoh queries to implement clients. It turns out that in Zenoh, there is no + // way to cancel a query once it is in-flight via the z_get() zenoh-c API. Thus, if an + // rmw_zenoh_cpp user does rmw_create_client(), rmw_send_request(), rmw_destroy_client(), but the + // query comes in after the rmw_destroy_client(), rmw_zenoh_cpp could access already-freed memory. + // + // The next 3 variables are used to avoid that situation. Any time a query is initiated via + // rmw_send_request(), num_in_flight_ is incremented. When the Zenoh calls the callback for the + // query reply, num_in_flight_ is decremented. When rmw_destroy_client() is called, is_shutdown_ + // is set to true. If num_in_flight_ is 0, the data associated with this structure is freed. + // If num_in_flight_ is *not* 0, then the data associated with this structure is maintained. + // In the situation where is_shutdown_ is true, and num_in_flight_ drops to 0 in the query + // callback, the query callback will free up the structure. + // + // There is one case which is not handled by this, which has to do with timeouts. The query + // timeout is currently set to essentially infinite. Thus, if a query is in-flight but never + // returns, the memory in this structure will never be freed. There isn't much we can do about + // that at this time, but we may want to consider changing the timeout so that the memory can + // eventually be freed up. + mutable std::mutex in_flight_mutex_; + bool is_shutdown_{false}; + size_t num_in_flight_{0}; }; } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 21f72b34..4b0cb076 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -542,9 +542,6 @@ rmw_create_publisher( if (RMW_RET_OK != ret) { return nullptr; } - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", "create publisher liveliness: %zu", - publisher_data->adapted_qos_profile.liveliness); publisher_data->typesupport_identifier = type_support->typesupport_identifier; publisher_data->type_hash = type_support->get_type_hash_func(type_support); @@ -2360,7 +2357,6 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) return RMW_RET_INVALID_ARGUMENT); // CLEANUP =================================================================== - z_drop(z_move(client_data->zn_closure_reply)); z_drop(z_move(client_data->keyexpr)); zc_liveliness_undeclare_token(z_move(client_data->token)); @@ -2372,9 +2368,13 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) client_data->response_type_support->~ResponseTypeSupport(), rmw_zenoh_cpp::ResponseTypeSupport, ); allocator->deallocate(client_data->response_type_support, allocator->state); - RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, ); - allocator->deallocate(client->data, allocator->state); + // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for + // why we need to do this. + if (!client_data->shutdown_and_query_in_flight()) { + RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, ); + allocator->deallocate(client->data, allocator->state); + } allocator->deallocate(const_cast(client->service_name), allocator->state); allocator->deallocate(client, allocator->state); @@ -2407,6 +2407,10 @@ rmw_send_request( "Unable to retrieve client_data from client.", RMW_RET_INVALID_ARGUMENT); + if (client_data->is_shutdown()) { + return RMW_RET_ERROR; + } + rmw_context_impl_s * context_impl = static_cast( client_data->context->impl); @@ -2461,6 +2465,10 @@ rmw_send_request( z_bytes_map_drop(z_move(map)); }); + // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for + // why we need to do this. + client_data->increment_in_flight_callbacks(); + opts.attachment = z_bytes_map_as_attachment(&map); opts.target = Z_QUERY_TARGET_ALL_COMPLETE; @@ -2474,11 +2482,13 @@ rmw_send_request( // and any number. opts.consolidation = z_query_consolidation_latest(); opts.value.payload = z_bytes_t{data_length, reinterpret_cast(request_bytes)}; - client_data->zn_closure_reply = - z_closure(rmw_zenoh_cpp::client_data_handler, nullptr, client_data); + z_owned_closure_reply_t zn_closure_reply = + z_closure(rmw_zenoh_cpp::client_data_handler, rmw_zenoh_cpp::client_data_drop, client_data); z_get( - z_loan(context_impl->session), z_loan( - client_data->keyexpr), "", &client_data->zn_closure_reply, &opts); + z_loan(context_impl->session), + z_loan(client_data->keyexpr), "", + z_move(zn_closure_reply), + &opts); return RMW_RET_OK; }