From d7243c0d80920da1736157747e1a6739fa0bc398 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Fri, 28 Jun 2024 15:33:38 -0400 Subject: [PATCH] Implement rmw_take_sequence. (#221) While this isn't currently used by rclcpp and rclpy, it *is* used inside of rcl, specifically in testing. Thus we need to implement it to pass all rcl tests. To reduce code duplication, this commit refactors __rmw_take() to handle this case. In particular, we rename __rmw_take() to __rmw_take_one(), with no error checking. Then we change rmw_take() and rmw_take_with_info() to do the error checking and call __rmw_take_one(). Finally we implement rmw_take_sequence() by calling __rwm_take_one() in a loop. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 185 ++++++++++++++++++++++---------- 1 file changed, 131 insertions(+), 54 deletions(-) diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index ea0d0767..8aa972b9 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1701,37 +1701,17 @@ rmw_subscription_get_content_filter( return RMW_RET_UNSUPPORTED; } -static rmw_ret_t __rmw_take( - const rmw_subscription_t * subscription, +static rmw_ret_t __rmw_take_one( + rmw_zenoh_cpp::rmw_subscription_data_t * sub_data, void * ros_message, - bool * taken, - rmw_message_info_t * message_info) + rmw_message_info_t * message_info, + bool * taken) { - RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(subscription->topic_name, RMW_RET_ERROR); - RMW_CHECK_ARGUMENT_FOR_NULL(subscription->data, RMW_RET_ERROR); - RMW_CHECK_ARGUMENT_FOR_NULL(ros_message, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(message_info, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_TYPE_IDENTIFIERS_MATCH( - subscription handle, - subscription->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, - return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - *taken = false; - auto sub_data = static_cast(subscription->data); - RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - - if (sub_data->context->impl->is_shutdown) { - return RMW_RET_OK; - } - - // RETRIEVE SERIALIZED MESSAGE =============================================== - std::unique_ptr msg_data = sub_data->pop_next_message(); if (msg_data == nullptr) { - // This tells rcl that the check for a new message was done, but no messages have come in yet. + // There are no more messages to take. return RMW_RET_OK; } @@ -1751,16 +1731,18 @@ static rmw_ret_t __rmw_take( return RMW_RET_ERROR; } - *taken = true; + if (message_info != nullptr) { + message_info->source_timestamp = msg_data->source_timestamp; + message_info->received_timestamp = msg_data->recv_timestamp; + message_info->publication_sequence_number = msg_data->sequence_number; + // TODO(clalancette): fill in reception_sequence_number + message_info->reception_sequence_number = 0; + message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; + memcpy(message_info->publisher_gid.data, msg_data->publisher_gid, RMW_GID_STORAGE_SIZE); + message_info->from_intra_process = false; + } - message_info->source_timestamp = msg_data->source_timestamp; - message_info->received_timestamp = msg_data->recv_timestamp; - message_info->publication_sequence_number = msg_data->sequence_number; - // TODO(clalancette): fill in reception_sequence_number - message_info->reception_sequence_number = 0; - message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - memcpy(message_info->publisher_gid.data, msg_data->publisher_gid, RMW_GID_STORAGE_SIZE); - message_info->from_intra_process = false; + *taken = true; return RMW_RET_OK; } @@ -1776,9 +1758,22 @@ rmw_take( { static_cast(allocation); - rmw_message_info_t dummy_msg_info; + RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(subscription->topic_name, RMW_RET_ERROR); + RMW_CHECK_ARGUMENT_FOR_NULL(subscription->data, RMW_RET_ERROR); + RMW_CHECK_ARGUMENT_FOR_NULL(ros_message, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + subscription handle, + subscription->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + + *taken = false; - return __rmw_take(subscription, ros_message, taken, &dummy_msg_info); + auto sub_data = static_cast(subscription->data); + RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); + + return __rmw_take_one(sub_data, ros_message, nullptr, taken); } //============================================================================== @@ -1792,7 +1787,24 @@ rmw_take_with_info( rmw_subscription_allocation_t * allocation) { static_cast(allocation); - return __rmw_take(subscription, ros_message, taken, message_info); + + RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(subscription->topic_name, RMW_RET_ERROR); + RMW_CHECK_ARGUMENT_FOR_NULL(subscription->data, RMW_RET_ERROR); + RMW_CHECK_ARGUMENT_FOR_NULL(ros_message, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(message_info, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + subscription handle, + subscription->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + + *taken = false; + + auto sub_data = static_cast(subscription->data); + RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); + + return __rmw_take_one(sub_data, ros_message, message_info, taken); } //============================================================================== @@ -1806,13 +1818,78 @@ rmw_take_sequence( size_t * taken, rmw_subscription_allocation_t * allocation) { - static_cast(subscription); - static_cast(count); - static_cast(message_sequence); - static_cast(message_info_sequence); - static_cast(taken); static_cast(allocation); - return RMW_RET_UNSUPPORTED; + + RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(subscription->topic_name, RMW_RET_ERROR); + RMW_CHECK_ARGUMENT_FOR_NULL(subscription->data, RMW_RET_ERROR); + RMW_CHECK_ARGUMENT_FOR_NULL(message_sequence, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(message_info_sequence, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + subscription handle, + subscription->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + + if (0u == count) { + RMW_SET_ERROR_MSG("count cannot be 0"); + return RMW_RET_INVALID_ARGUMENT; + } + + if (count > message_sequence->capacity) { + RMW_SET_ERROR_MSG("Insuffient capacity in message_sequence"); + return RMW_RET_INVALID_ARGUMENT; + } + + if (count > message_info_sequence->capacity) { + RMW_SET_ERROR_MSG("Insuffient capacity in message_info_sequence"); + return RMW_RET_INVALID_ARGUMENT; + } + + if (count > (std::numeric_limits::max)()) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "Cannot take %zu samples at once, limit is %" PRIu32, + count, (std::numeric_limits::max)()); + return RMW_RET_ERROR; + } + + *taken = 0; + + auto sub_data = static_cast(subscription->data); + RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); + + if (sub_data->context->impl->is_shutdown) { + return RMW_RET_OK; + } + + rmw_ret_t ret; + + while (*taken < count) { + bool one_taken = false; + + ret = __rmw_take_one( + sub_data, message_sequence->data[*taken], + &message_info_sequence->data[*taken], &one_taken); + if (ret != RMW_RET_OK) { + // If we are taking a sequence and the 2nd take in the sequence failed, we'll report + // RMW_RET_ERROR to the caller, but we will *also* tell the caller that there are valid + // messages already taken (via the message_sequence size). It is up to the caller to deal + // with that situation appropriately. + break; + } + + if (!one_taken) { + // No error, but there was nothing left to be taken, so break out of the loop + break; + } + + (*taken)++; + } + + message_sequence->size = *taken; + message_info_sequence->size = *taken; + + return ret; } //============================================================================== @@ -1864,14 +1941,16 @@ static rmw_ret_t __rmw_take_serialized( *taken = true; - message_info->source_timestamp = msg_data->source_timestamp; - message_info->received_timestamp = msg_data->recv_timestamp; - message_info->publication_sequence_number = msg_data->sequence_number; - // TODO(clalancette): fill in reception_sequence_number - message_info->reception_sequence_number = 0; - message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - memcpy(message_info->publisher_gid.data, msg_data->publisher_gid, RMW_GID_STORAGE_SIZE); - message_info->from_intra_process = false; + if (message_info != nullptr) { + message_info->source_timestamp = msg_data->source_timestamp; + message_info->received_timestamp = msg_data->recv_timestamp; + message_info->publication_sequence_number = msg_data->sequence_number; + // TODO(clalancette): fill in reception_sequence_number + message_info->reception_sequence_number = 0; + message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; + memcpy(message_info->publisher_gid.data, msg_data->publisher_gid, RMW_GID_STORAGE_SIZE); + message_info->from_intra_process = false; + } return RMW_RET_OK; } @@ -1887,9 +1966,7 @@ rmw_take_serialized_message( { static_cast(allocation); - rmw_message_info_t dummy_msg_info; - - return __rmw_take_serialized(subscription, serialized_message, taken, &dummy_msg_info); + return __rmw_take_serialized(subscription, serialized_message, taken, nullptr); } //==============================================================================