Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tracing instrumentation using tracetools #294

Open
wants to merge 1 commit into
base: rolling
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ find_package(rcutils REQUIRED)
find_package(rosidl_typesupport_fastrtps_c REQUIRED)
find_package(rosidl_typesupport_fastrtps_cpp REQUIRED)
find_package(rmw REQUIRED)
find_package(tracetools REQUIRED)
find_package(zenoh_c_vendor REQUIRED)
find_package(zenohc_debug QUIET)
if(NOT zenohc_debug_FOUND)
Expand Down Expand Up @@ -72,6 +73,7 @@ target_link_libraries(rmw_zenoh_cpp
rosidl_typesupport_fastrtps_c::rosidl_typesupport_fastrtps_c
rosidl_typesupport_fastrtps_cpp::rosidl_typesupport_fastrtps_cpp
rmw::rmw
tracetools::tracetools
zenohc::lib
)

Expand Down
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
<depend>rosidl_typesupport_fastrtps_c</depend>
<depend>rosidl_typesupport_fastrtps_cpp</depend>
<depend>rmw</depend>
<depend>tracetools</depend>

<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
Expand Down
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/src/detail/rmw_node_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ bool NodeData::create_pub_data(

auto pub_data = PublisherData::make(
std::move(session),
publisher,
node_,
entity_->node_info(),
id_,
Expand Down
20 changes: 17 additions & 3 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@
#include "rmw/get_topic_endpoint_info.h"
#include "rmw/impl/cpp/macros.hpp"

#include "tracetools/tracetools.h"

namespace rmw_zenoh_cpp
{
///=============================================================================
std::shared_ptr<PublisherData> PublisherData::make(
z_session_t session,
const rmw_publisher_t * const rmw_publisher,
const rmw_node_t * const node,
liveliness::NodeInfo node_info,
std::size_t node_id,
Expand Down Expand Up @@ -189,6 +192,7 @@ std::shared_ptr<PublisherData> PublisherData::make(

return std::shared_ptr<PublisherData>(
new PublisherData{
rmw_publisher,
node,
std::move(entity),
std::move(pub),
Expand All @@ -201,14 +205,16 @@ std::shared_ptr<PublisherData> PublisherData::make(

///=============================================================================
PublisherData::PublisherData(
const rmw_publisher_t * const rmw_publisher,
const rmw_node_t * rmw_node,
std::shared_ptr<liveliness::Entity> entity,
z_owned_publisher_t pub,
std::optional<ze_owned_publication_cache_t> pub_cache,
zc_owned_liveliness_token_t token,
const void * type_support_impl,
std::unique_ptr<MessageTypeSupport> type_support)
: rmw_node_(rmw_node),
: rmw_publisher_(rmw_publisher),
rmw_node_(rmw_node),
entity_(std::move(entity)),
pub_(std::move(pub)),
pub_cache_(std::move(pub_cache)),
Expand Down Expand Up @@ -296,6 +302,7 @@ rmw_ret_t PublisherData::publish(

const size_t data_length = ser.get_serialized_data_length();

int64_t source_timestamp = 0;
z_owned_bytes_map_t map =
create_map_and_set_sequence_num(
sequence_number_++,
Expand All @@ -308,7 +315,8 @@ rmw_ret_t PublisherData::publish(
gid_bytes.len = RMW_GID_STORAGE_SIZE;
gid_bytes.start = local_gid;
z_bytes_map_insert_by_copy(map, z_bytes_new(key), gid_bytes);
});
},
&source_timestamp);
if (!z_check(map)) {
// create_map_and_set_sequence_num already set the error
return RMW_RET_ERROR;
Expand All @@ -325,6 +333,8 @@ rmw_ret_t PublisherData::publish(
z_publisher_put_options_t options = z_publisher_put_options_default();
options.attachment = z_bytes_map_as_attachment(&map);

TRACETOOLS_TRACEPOINT(
rmw_publish, static_cast<const void *>(rmw_publisher_), ros_message, source_timestamp);
if (shmbuf.has_value()) {
zc_shmbuf_set_length(&shmbuf.value(), data_length);
zc_owned_payload_t payload = zc_shmbuf_into_payload(z_move(shmbuf.value()));
Expand Down Expand Up @@ -360,6 +370,7 @@ rmw_ret_t PublisherData::publish_serialized_message(

std::lock_guard<std::mutex> lock(mutex_);

int64_t source_timestamp = 0;
z_owned_bytes_map_t map = rmw_zenoh_cpp::create_map_and_set_sequence_num(
sequence_number_++,
[this](z_owned_bytes_map_t * map, const char * key)
Expand All @@ -372,7 +383,8 @@ rmw_ret_t PublisherData::publish_serialized_message(
gid_bytes.len = RMW_GID_STORAGE_SIZE;
gid_bytes.start = local_gid;
z_bytes_map_insert_by_copy(map, z_bytes_new(key), gid_bytes);
});
},
&source_timestamp);

if (!z_check(map)) {
// create_map_and_set_sequence_num already set the error
Expand All @@ -391,6 +403,8 @@ rmw_ret_t PublisherData::publish_serialized_message(
z_publisher_put_options_t options = z_publisher_put_options_default();
options.attachment = z_bytes_map_as_attachment(&map);

TRACETOOLS_TRACEPOINT(
rmw_publish, static_cast<const void *>(rmw_publisher_), serialized_message, source_timestamp);
// Returns 0 if success.
int8_t ret = z_publisher_put(
z_loan(pub_),
Expand Down
4 changes: 4 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class PublisherData final
// Make a shared_ptr of PublisherData.
static std::shared_ptr<PublisherData> make(
z_session_t session,
const rmw_publisher_t * const rmw_publisher,
const rmw_node_t * const node,
liveliness::NodeInfo node_info,
std::size_t node_id,
Expand Down Expand Up @@ -88,6 +89,7 @@ class PublisherData final
private:
// Constructor.
PublisherData(
const rmw_publisher_t * const rmw_publisher,
const rmw_node_t * rmw_node,
std::shared_ptr<liveliness::Entity> entity,
z_owned_publisher_t pub,
Expand All @@ -98,6 +100,8 @@ class PublisherData final

// Internal mutex.
mutable std::mutex mutex_;
// The rmw publisher
const rmw_publisher_t * rmw_publisher_;
// The parent node.
const rmw_node_t * rmw_node_;
// The Entity generated for the publisher.
Expand Down
8 changes: 6 additions & 2 deletions rmw_zenoh_cpp/src/detail/zenoh_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ namespace rmw_zenoh_cpp
z_owned_bytes_map_t
create_map_and_set_sequence_num(
int64_t sequence_number,
GIDCopier gid_copier)
GIDCopier gid_copier,
int64_t * source_timestamp)
{
z_owned_bytes_map_t map = z_bytes_map_new();
if (!z_check(map)) {
Expand All @@ -50,9 +51,12 @@ create_map_and_set_sequence_num(

auto now = std::chrono::system_clock::now().time_since_epoch();
auto now_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(now);
if (nullptr != source_timestamp) {
*source_timestamp = now_ns.count();
}
char source_ts_str[20];
if (rcutils_snprintf(source_ts_str, sizeof(source_ts_str), "%" PRId64, now_ns.count()) < 0) {
RMW_SET_ERROR_MSG("failed to print sequence_number into buffer");
RMW_SET_ERROR_MSG("failed to print source_timestamp into buffer");
return z_bytes_map_null();
}
z_bytes_map_insert_by_copy(&map, z_bytes_new("source_timestamp"), z_bytes_new(source_ts_str));
Expand Down
3 changes: 2 additions & 1 deletion rmw_zenoh_cpp/src/detail/zenoh_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ namespace rmw_zenoh_cpp
using GIDCopier = std::function<void (z_owned_bytes_map_t *, const char *)>;
///=============================================================================
z_owned_bytes_map_t
create_map_and_set_sequence_num(int64_t sequence_number, GIDCopier gid_copier);
create_map_and_set_sequence_num(
int64_t sequence_number, GIDCopier gid_copier, int64_t * source_timestamp = nullptr);

///=============================================================================
// A class to store the replies to service requests.
Expand Down
45 changes: 42 additions & 3 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
#include "rmw/validate_namespace.h"
#include "rmw/validate_node_name.h"

#include "tracetools/tracetools.h"

namespace
{
//==============================================================================
Expand Down Expand Up @@ -454,6 +456,13 @@ rmw_create_publisher(
free_topic_name.cancel();
free_rmw_publisher.cancel();

if (TRACETOOLS_TRACEPOINT_ENABLED(rmw_publisher_init)) {
rmw_gid_t gid{};
if (RMW_RET_OK == rmw_get_gid_for_publisher(rmw_publisher, &gid)) {
TRACETOOLS_DO_TRACEPOINT(
rmw_publisher_init, static_cast<const void *>(rmw_publisher), gid.data);
}
}
return rmw_publisher;
}

Expand Down Expand Up @@ -990,6 +999,11 @@ rmw_create_subscription(
free_topic_name.cancel();
free_rmw_subscription.cancel();

// rmw does not require GIDs for subscriptions, and GIDs in rmw_zenoh are not based on any ID of
// the underlying zenoh objects, so there is no need to collect a GID here
rmw_gid_t gid{};
TRACETOOLS_TRACEPOINT(
rmw_subscription_init, static_cast<const void *>(rmw_subscription), gid.data);
return rmw_subscription;
}

Expand Down Expand Up @@ -1134,7 +1148,18 @@ rmw_take(
static_cast<rmw_zenoh_cpp::SubscriptionData *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT);

return sub_data->take_one_message(ros_message, nullptr, taken);
if (!TRACETOOLS_TRACEPOINT_ENABLED(rmw_take)) {
return sub_data->take_one_message(ros_message, nullptr, taken);
}
rmw_message_info_t message_info{};
rmw_ret_t ret = sub_data->take_one_message(ros_message, &message_info, taken);
TRACETOOLS_DO_TRACEPOINT(
rmw_take,
static_cast<const void *>(subscription),
static_cast<const void *>(ros_message),
message_info.source_timestamp,
*taken);
return ret;
}

//==============================================================================
Expand Down Expand Up @@ -1163,7 +1188,14 @@ rmw_take_with_info(
static_cast<rmw_zenoh_cpp::SubscriptionData *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT);

return sub_data->take_one_message(ros_message, message_info, taken);
rmw_ret_t ret = sub_data->take_one_message(ros_message, message_info, taken);
TRACETOOLS_TRACEPOINT(
rmw_take,
static_cast<const void *>(subscription),
static_cast<const void *>(ros_message),
message_info->source_timestamp,
*taken);
return ret;
}

//==============================================================================
Expand Down Expand Up @@ -1269,11 +1301,18 @@ __rmw_take_serialized(
static_cast<rmw_zenoh_cpp::SubscriptionData *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT);

return sub_data->take_serialized_message(
rmw_ret_t ret = sub_data->take_serialized_message(
serialized_message,
taken,
message_info
);
TRACETOOLS_TRACEPOINT(
rmw_take,
static_cast<const void *>(subscription),
static_cast<const void *>(serialized_message),
message_info->source_timestamp,
*taken);
return ret;
}
} // namespace

Expand Down
Loading