diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index 7c4a27eb..7a80d2ca 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -21,6 +21,7 @@ find_package(rcutils REQUIRED) find_package(rosidl_typesupport_fastrtps_c REQUIRED) find_package(rosidl_typesupport_fastrtps_cpp REQUIRED) find_package(rmw REQUIRED) +find_package(tracetools REQUIRED) find_package(zenoh_c_vendor REQUIRED) find_package(zenohc_debug QUIET) if(NOT zenohc_debug_FOUND) @@ -72,6 +73,7 @@ target_link_libraries(rmw_zenoh_cpp rosidl_typesupport_fastrtps_c::rosidl_typesupport_fastrtps_c rosidl_typesupport_fastrtps_cpp::rosidl_typesupport_fastrtps_cpp rmw::rmw + tracetools::tracetools zenohc::lib ) diff --git a/rmw_zenoh_cpp/package.xml b/rmw_zenoh_cpp/package.xml index e1d2a24a..97695c92 100644 --- a/rmw_zenoh_cpp/package.xml +++ b/rmw_zenoh_cpp/package.xml @@ -25,6 +25,7 @@ rosidl_typesupport_fastrtps_c rosidl_typesupport_fastrtps_cpp rmw + tracetools ament_lint_auto ament_lint_common diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index bd3f3f6e..83677515 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -145,6 +145,7 @@ bool NodeData::create_pub_data( auto pub_data = PublisherData::make( std::move(session), + publisher, node_, entity_->node_info(), id_, diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index b2ea8235..53c23843 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -35,11 +35,14 @@ #include "rmw/get_topic_endpoint_info.h" #include "rmw/impl/cpp/macros.hpp" +#include "tracetools/tracetools.h" + namespace rmw_zenoh_cpp { ///============================================================================= std::shared_ptr PublisherData::make( z_session_t session, + const rmw_publisher_t * const rmw_publisher, const rmw_node_t * const node, liveliness::NodeInfo node_info, std::size_t node_id, @@ -189,6 +192,7 @@ std::shared_ptr PublisherData::make( return std::shared_ptr( new PublisherData{ + rmw_publisher, node, std::move(entity), std::move(pub), @@ -201,6 +205,7 @@ std::shared_ptr PublisherData::make( ///============================================================================= PublisherData::PublisherData( + const rmw_publisher_t * const rmw_publisher, const rmw_node_t * rmw_node, std::shared_ptr entity, z_owned_publisher_t pub, @@ -208,7 +213,8 @@ PublisherData::PublisherData( zc_owned_liveliness_token_t token, const void * type_support_impl, std::unique_ptr type_support) -: rmw_node_(rmw_node), +: rmw_publisher_(rmw_publisher), + rmw_node_(rmw_node), entity_(std::move(entity)), pub_(std::move(pub)), pub_cache_(std::move(pub_cache)), @@ -296,6 +302,7 @@ rmw_ret_t PublisherData::publish( const size_t data_length = ser.get_serialized_data_length(); + int64_t source_timestamp = 0; z_owned_bytes_map_t map = create_map_and_set_sequence_num( sequence_number_++, @@ -308,7 +315,8 @@ rmw_ret_t PublisherData::publish( gid_bytes.len = RMW_GID_STORAGE_SIZE; gid_bytes.start = local_gid; z_bytes_map_insert_by_copy(map, z_bytes_new(key), gid_bytes); - }); + }, + &source_timestamp); if (!z_check(map)) { // create_map_and_set_sequence_num already set the error return RMW_RET_ERROR; @@ -325,6 +333,8 @@ rmw_ret_t PublisherData::publish( z_publisher_put_options_t options = z_publisher_put_options_default(); options.attachment = z_bytes_map_as_attachment(&map); + TRACETOOLS_TRACEPOINT( + rmw_publish, static_cast(rmw_publisher_), ros_message, source_timestamp); if (shmbuf.has_value()) { zc_shmbuf_set_length(&shmbuf.value(), data_length); zc_owned_payload_t payload = zc_shmbuf_into_payload(z_move(shmbuf.value())); @@ -360,6 +370,7 @@ rmw_ret_t PublisherData::publish_serialized_message( std::lock_guard lock(mutex_); + int64_t source_timestamp = 0; z_owned_bytes_map_t map = rmw_zenoh_cpp::create_map_and_set_sequence_num( sequence_number_++, [this](z_owned_bytes_map_t * map, const char * key) @@ -372,7 +383,8 @@ rmw_ret_t PublisherData::publish_serialized_message( gid_bytes.len = RMW_GID_STORAGE_SIZE; gid_bytes.start = local_gid; z_bytes_map_insert_by_copy(map, z_bytes_new(key), gid_bytes); - }); + }, + &source_timestamp); if (!z_check(map)) { // create_map_and_set_sequence_num already set the error @@ -391,6 +403,8 @@ rmw_ret_t PublisherData::publish_serialized_message( z_publisher_put_options_t options = z_publisher_put_options_default(); options.attachment = z_bytes_map_as_attachment(&map); + TRACETOOLS_TRACEPOINT( + rmw_publish, static_cast(rmw_publisher_), serialized_message, source_timestamp); // Returns 0 if success. int8_t ret = z_publisher_put( z_loan(pub_), diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp index 7b124cc3..df2c1d93 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp @@ -43,6 +43,7 @@ class PublisherData final // Make a shared_ptr of PublisherData. static std::shared_ptr make( z_session_t session, + const rmw_publisher_t * const rmw_publisher, const rmw_node_t * const node, liveliness::NodeInfo node_info, std::size_t node_id, @@ -88,6 +89,7 @@ class PublisherData final private: // Constructor. PublisherData( + const rmw_publisher_t * const rmw_publisher, const rmw_node_t * rmw_node, std::shared_ptr entity, z_owned_publisher_t pub, @@ -98,6 +100,8 @@ class PublisherData final // Internal mutex. mutable std::mutex mutex_; + // The rmw publisher + const rmw_publisher_t * rmw_publisher_; // The parent node. const rmw_node_t * rmw_node_; // The Entity generated for the publisher. diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp index 2e6c8e0c..5bf1e6af 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp @@ -27,7 +27,8 @@ namespace rmw_zenoh_cpp z_owned_bytes_map_t create_map_and_set_sequence_num( int64_t sequence_number, - GIDCopier gid_copier) + GIDCopier gid_copier, + int64_t * source_timestamp) { z_owned_bytes_map_t map = z_bytes_map_new(); if (!z_check(map)) { @@ -50,9 +51,12 @@ create_map_and_set_sequence_num( auto now = std::chrono::system_clock::now().time_since_epoch(); auto now_ns = std::chrono::duration_cast(now); + if (nullptr != source_timestamp) { + *source_timestamp = now_ns.count(); + } char source_ts_str[20]; if (rcutils_snprintf(source_ts_str, sizeof(source_ts_str), "%" PRId64, now_ns.count()) < 0) { - RMW_SET_ERROR_MSG("failed to print sequence_number into buffer"); + RMW_SET_ERROR_MSG("failed to print source_timestamp into buffer"); return z_bytes_map_null(); } z_bytes_map_insert_by_copy(&map, z_bytes_new("source_timestamp"), z_bytes_new(source_ts_str)); diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp index f7ec26b2..d614428b 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp @@ -31,7 +31,8 @@ namespace rmw_zenoh_cpp using GIDCopier = std::function; ///============================================================================= z_owned_bytes_map_t -create_map_and_set_sequence_num(int64_t sequence_number, GIDCopier gid_copier); +create_map_and_set_sequence_num( + int64_t sequence_number, GIDCopier gid_copier, int64_t * source_timestamp = nullptr); ///============================================================================= // A class to store the replies to service requests. diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index eebe1c4b..289ffc0b 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -56,6 +56,8 @@ #include "rmw/validate_namespace.h" #include "rmw/validate_node_name.h" +#include "tracetools/tracetools.h" + namespace { //============================================================================== @@ -454,6 +456,13 @@ rmw_create_publisher( free_topic_name.cancel(); free_rmw_publisher.cancel(); + if (TRACETOOLS_TRACEPOINT_ENABLED(rmw_publisher_init)) { + rmw_gid_t gid{}; + if (RMW_RET_OK == rmw_get_gid_for_publisher(rmw_publisher, &gid)) { + TRACETOOLS_DO_TRACEPOINT( + rmw_publisher_init, static_cast(rmw_publisher), gid.data); + } + } return rmw_publisher; } @@ -990,6 +999,11 @@ rmw_create_subscription( free_topic_name.cancel(); free_rmw_subscription.cancel(); + // rmw does not require GIDs for subscriptions, and GIDs in rmw_zenoh are not based on any ID of + // the underlying zenoh objects, so there is no need to collect a GID here + rmw_gid_t gid{}; + TRACETOOLS_TRACEPOINT( + rmw_subscription_init, static_cast(rmw_subscription), gid.data); return rmw_subscription; } @@ -1134,7 +1148,18 @@ rmw_take( static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - return sub_data->take_one_message(ros_message, nullptr, taken); + if (!TRACETOOLS_TRACEPOINT_ENABLED(rmw_take)) { + return sub_data->take_one_message(ros_message, nullptr, taken); + } + rmw_message_info_t message_info{}; + rmw_ret_t ret = sub_data->take_one_message(ros_message, &message_info, taken); + TRACETOOLS_DO_TRACEPOINT( + rmw_take, + static_cast(subscription), + static_cast(ros_message), + message_info.source_timestamp, + *taken); + return ret; } //============================================================================== @@ -1163,7 +1188,14 @@ rmw_take_with_info( static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - return sub_data->take_one_message(ros_message, message_info, taken); + rmw_ret_t ret = sub_data->take_one_message(ros_message, message_info, taken); + TRACETOOLS_TRACEPOINT( + rmw_take, + static_cast(subscription), + static_cast(ros_message), + message_info->source_timestamp, + *taken); + return ret; } //============================================================================== @@ -1269,11 +1301,18 @@ __rmw_take_serialized( static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - return sub_data->take_serialized_message( + rmw_ret_t ret = sub_data->take_serialized_message( serialized_message, taken, message_info ); + TRACETOOLS_TRACEPOINT( + rmw_take, + static_cast(subscription), + static_cast(serialized_message), + message_info->source_timestamp, + *taken); + return ret; } } // namespace