Skip to content

Commit

Permalink
Implement __rmw_take_serialized. (#113)
Browse files Browse the repository at this point in the history
* Implement __rmw_take_serialized.

This allows us to implement rmw_take_serialized_message
and rmw_take_serialize_message_with_info, both of which
are required for rosbag2.

Note that this implementation copies some parts of __rmw_take.
Given the different types involved (void * ros_message
vs. rmw_serialize_message_t *), I didn't see a readable way
to reduce this duplication.

Signed-off-by: Chris Lalancette <[email protected]>

* Style

Signed-off-by: Yadunund <[email protected]>

---------

Signed-off-by: Chris Lalancette <[email protected]>
Signed-off-by: Yadunund <[email protected]>
Co-authored-by: Yadunund <[email protected]>
  • Loading branch information
clalancette and Yadunund authored Feb 19, 2024
1 parent e77082f commit 08f5c30
Showing 1 changed file with 69 additions and 9 deletions.
78 changes: 69 additions & 9 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1634,6 +1634,69 @@ rmw_take_sequence(
return RMW_RET_UNSUPPORTED;
}

//==============================================================================
static rmw_ret_t __rmw_take_serialized(
const rmw_subscription_t * subscription,
rmw_serialized_message_t * serialized_message,
bool * taken,
rmw_message_info_t * message_info)
{
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(subscription->topic_name, RMW_RET_ERROR);
RMW_CHECK_ARGUMENT_FOR_NULL(subscription->data, RMW_RET_ERROR);
RMW_CHECK_ARGUMENT_FOR_NULL(serialized_message, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(message_info, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
subscription handle,
subscription->implementation_identifier, rmw_zenoh_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);

*taken = false;

auto sub_data = static_cast<rmw_subscription_data_t *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT);

if (sub_data->context->impl->is_shutdown) {
return RMW_RET_OK;
}

// RETRIEVE SERIALIZED MESSAGE ===============================================

std::unique_ptr<saved_msg_data> msg_data = sub_data->pop_next_message();
if (msg_data == nullptr) {
// This tells rcl that the check for a new message was done, but no messages have come in yet.
return RMW_RET_OK;
}

if (serialized_message->buffer_capacity < msg_data->payload.payload.len) {
rmw_ret_t ret =
rmw_serialized_message_resize(serialized_message, msg_data->payload.payload.len);
if (ret != RMW_RET_OK) {
return ret; // Error message already set
}
}
serialized_message->buffer_length = msg_data->payload.payload.len;
memcpy(
serialized_message->buffer, msg_data->payload.payload.start,
msg_data->payload.payload.len);

*taken = true;

// TODO(clalancette): fill in source_timestamp
message_info->source_timestamp = 0;
message_info->received_timestamp = msg_data->recv_timestamp;
// TODO(clalancette): fill in publication_sequence_number
message_info->publication_sequence_number = 0;
// TODO(clalancette): fill in reception_sequence_number
message_info->reception_sequence_number = 0;
message_info->publisher_gid.implementation_identifier = rmw_zenoh_identifier;
memcpy(message_info->publisher_gid.data, msg_data->publisher_gid, 16);
message_info->from_intra_process = false;

return RMW_RET_OK;
}

//==============================================================================
/// Take an incoming ROS message as a byte stream.
rmw_ret_t
Expand All @@ -1643,11 +1706,11 @@ rmw_take_serialized_message(
bool * taken,
rmw_subscription_allocation_t * allocation)
{
static_cast<void>(subscription);
static_cast<void>(serialized_message);
static_cast<void>(taken);
static_cast<void>(allocation);
return RMW_RET_UNSUPPORTED;

rmw_message_info_t dummy_msg_info;

return __rmw_take_serialized(subscription, serialized_message, taken, &dummy_msg_info);
}

//==============================================================================
Expand All @@ -1660,12 +1723,9 @@ rmw_take_serialized_message_with_info(
rmw_message_info_t * message_info,
rmw_subscription_allocation_t * allocation)
{
static_cast<void>(subscription);
static_cast<void>(serialized_message);
static_cast<void>(taken);
static_cast<void>(message_info);
static_cast<void>(allocation);
return RMW_RET_UNSUPPORTED;

return __rmw_take_serialized(subscription, serialized_message, taken, message_info);
}

//==============================================================================
Expand Down

0 comments on commit 08f5c30

Please sign in to comment.