Skip to content

Commit

Permalink
Implement rmw_take_sequence.
Browse files Browse the repository at this point in the history
While this isn't currently used by rclcpp and rclpy,
it *is* used inside of rcl, specifically in testing.  Thus
we need to implement it to pass all rcl tests.

To reduce code duplication, this commit refactors __rmw_take()
to handle this case.  In particular, we rename __rmw_take() to
__rmw_take_one(), with no error checking.  Then we change
rmw_take() and rmw_take_with_info() to do the error checking
and call __rmw_take_one().  Finally we implement rmw_take_sequence()
by calling __rwm_take_one() in a loop.

Signed-off-by: Chris Lalancette <[email protected]>
  • Loading branch information
clalancette committed Jun 27, 2024
1 parent cdea502 commit 8facea7
Showing 1 changed file with 131 additions and 54 deletions.
185 changes: 131 additions & 54 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1701,37 +1701,17 @@ rmw_subscription_get_content_filter(
return RMW_RET_UNSUPPORTED;
}

static rmw_ret_t __rmw_take(
const rmw_subscription_t * subscription,
static rmw_ret_t __rmw_take_one(
rmw_zenoh_cpp::rmw_subscription_data_t * sub_data,
void * ros_message,
bool * taken,
rmw_message_info_t * message_info)
rmw_message_info_t * message_info,
bool * taken)
{
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(subscription->topic_name, RMW_RET_ERROR);
RMW_CHECK_ARGUMENT_FOR_NULL(subscription->data, RMW_RET_ERROR);
RMW_CHECK_ARGUMENT_FOR_NULL(ros_message, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(message_info, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
subscription handle,
subscription->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);

*taken = false;

auto sub_data = static_cast<rmw_zenoh_cpp::rmw_subscription_data_t *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT);

if (sub_data->context->impl->is_shutdown) {
return RMW_RET_OK;
}

// RETRIEVE SERIALIZED MESSAGE ===============================================

std::unique_ptr<rmw_zenoh_cpp::saved_msg_data> msg_data = sub_data->pop_next_message();
if (msg_data == nullptr) {
// This tells rcl that the check for a new message was done, but no messages have come in yet.
// There are no more messages to take.
return RMW_RET_OK;
}

Expand All @@ -1751,16 +1731,18 @@ static rmw_ret_t __rmw_take(
return RMW_RET_ERROR;
}

*taken = true;
if (message_info != nullptr) {
message_info->source_timestamp = msg_data->source_timestamp;
message_info->received_timestamp = msg_data->recv_timestamp;
message_info->publication_sequence_number = msg_data->sequence_number;
// TODO(clalancette): fill in reception_sequence_number
message_info->reception_sequence_number = 0;
message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier;
memcpy(message_info->publisher_gid.data, msg_data->publisher_gid, RMW_GID_STORAGE_SIZE);
message_info->from_intra_process = false;
}

message_info->source_timestamp = msg_data->source_timestamp;
message_info->received_timestamp = msg_data->recv_timestamp;
message_info->publication_sequence_number = msg_data->sequence_number;
// TODO(clalancette): fill in reception_sequence_number
message_info->reception_sequence_number = 0;
message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier;
memcpy(message_info->publisher_gid.data, msg_data->publisher_gid, RMW_GID_STORAGE_SIZE);
message_info->from_intra_process = false;
*taken = true;

return RMW_RET_OK;
}
Expand All @@ -1776,9 +1758,22 @@ rmw_take(
{
static_cast<void>(allocation);

rmw_message_info_t dummy_msg_info;
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(subscription->topic_name, RMW_RET_ERROR);
RMW_CHECK_ARGUMENT_FOR_NULL(subscription->data, RMW_RET_ERROR);
RMW_CHECK_ARGUMENT_FOR_NULL(ros_message, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
subscription handle,
subscription->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);

*taken = false;

return __rmw_take(subscription, ros_message, taken, &dummy_msg_info);
auto sub_data = static_cast<rmw_zenoh_cpp::rmw_subscription_data_t *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT);

return __rmw_take_one(sub_data, ros_message, nullptr, taken);
}

//==============================================================================
Expand All @@ -1792,7 +1787,24 @@ rmw_take_with_info(
rmw_subscription_allocation_t * allocation)
{
static_cast<void>(allocation);
return __rmw_take(subscription, ros_message, taken, message_info);

RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(subscription->topic_name, RMW_RET_ERROR);
RMW_CHECK_ARGUMENT_FOR_NULL(subscription->data, RMW_RET_ERROR);
RMW_CHECK_ARGUMENT_FOR_NULL(ros_message, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(message_info, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
subscription handle,
subscription->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);

*taken = false;

auto sub_data = static_cast<rmw_zenoh_cpp::rmw_subscription_data_t *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT);

return __rmw_take_one(sub_data, ros_message, message_info, taken);
}

//==============================================================================
Expand All @@ -1806,13 +1818,78 @@ rmw_take_sequence(
size_t * taken,
rmw_subscription_allocation_t * allocation)
{
static_cast<void>(subscription);
static_cast<void>(count);
static_cast<void>(message_sequence);
static_cast<void>(message_info_sequence);
static_cast<void>(taken);
static_cast<void>(allocation);
return RMW_RET_UNSUPPORTED;

RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(subscription->topic_name, RMW_RET_ERROR);
RMW_CHECK_ARGUMENT_FOR_NULL(subscription->data, RMW_RET_ERROR);
RMW_CHECK_ARGUMENT_FOR_NULL(message_sequence, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(message_info_sequence, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
subscription handle,
subscription->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);

if (0u == count) {
RMW_SET_ERROR_MSG("count cannot be 0");
return RMW_RET_INVALID_ARGUMENT;
}

if (count > message_sequence->capacity) {
RMW_SET_ERROR_MSG("Insuffient capacity in message_sequence");
return RMW_RET_INVALID_ARGUMENT;
}

if (count > message_info_sequence->capacity) {
RMW_SET_ERROR_MSG("Insuffient capacity in message_info_sequence");
return RMW_RET_INVALID_ARGUMENT;
}

if (count > (std::numeric_limits<uint32_t>::max)()) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"Cannot take %zu samples at once, limit is %" PRIu32,
count, (std::numeric_limits<uint32_t>::max)());
return RMW_RET_ERROR;
}

*taken = 0;

auto sub_data = static_cast<rmw_zenoh_cpp::rmw_subscription_data_t *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT);

if (sub_data->context->impl->is_shutdown) {
return RMW_RET_OK;
}

rmw_ret_t ret;

while (*taken < count) {
bool one_taken = false;

ret = __rmw_take_one(
sub_data, message_sequence->data[*taken],
&message_info_sequence->data[*taken], &one_taken);
if (ret != RMW_RET_OK) {
// If we are taking a sequence and the 2nd take in the sequence failed, we'll report
// RMW_RET_ERROR to the caller, but we will *also* tell the caller that there are valid
// messages already taken (via the message_sequence size). It is up to the caller to deal
// with that situation appropriately.
break;
}

if (!one_taken) {
// No error, but there was nothing left to be taken, so break out of the loop
break;
}

(*taken)++;
}

message_sequence->size = *taken;
message_info_sequence->size = *taken;

return ret;
}

//==============================================================================
Expand Down Expand Up @@ -1864,14 +1941,16 @@ static rmw_ret_t __rmw_take_serialized(

*taken = true;

message_info->source_timestamp = msg_data->source_timestamp;
message_info->received_timestamp = msg_data->recv_timestamp;
message_info->publication_sequence_number = msg_data->sequence_number;
// TODO(clalancette): fill in reception_sequence_number
message_info->reception_sequence_number = 0;
message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier;
memcpy(message_info->publisher_gid.data, msg_data->publisher_gid, RMW_GID_STORAGE_SIZE);
message_info->from_intra_process = false;
if (message_info != nullptr) {
message_info->source_timestamp = msg_data->source_timestamp;
message_info->received_timestamp = msg_data->recv_timestamp;
message_info->publication_sequence_number = msg_data->sequence_number;
// TODO(clalancette): fill in reception_sequence_number
message_info->reception_sequence_number = 0;
message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier;
memcpy(message_info->publisher_gid.data, msg_data->publisher_gid, RMW_GID_STORAGE_SIZE);
message_info->from_intra_process = false;
}

return RMW_RET_OK;
}
Expand All @@ -1887,9 +1966,7 @@ rmw_take_serialized_message(
{
static_cast<void>(allocation);

rmw_message_info_t dummy_msg_info;

return __rmw_take_serialized(subscription, serialized_message, taken, &dummy_msg_info);
return __rmw_take_serialized(subscription, serialized_message, taken, nullptr);
}

//==============================================================================
Expand Down

0 comments on commit 8facea7

Please sign in to comment.