diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 5b2f19b3..a2863270 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1634,6 +1634,69 @@ rmw_take_sequence( return RMW_RET_UNSUPPORTED; } +//============================================================================== +static rmw_ret_t __rmw_take_serialized( + const rmw_subscription_t * subscription, + rmw_serialized_message_t * serialized_message, + bool * taken, + rmw_message_info_t * message_info) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(subscription->topic_name, RMW_RET_ERROR); + RMW_CHECK_ARGUMENT_FOR_NULL(subscription->data, RMW_RET_ERROR); + RMW_CHECK_ARGUMENT_FOR_NULL(serialized_message, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(message_info, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + subscription handle, + subscription->implementation_identifier, rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + + *taken = false; + + auto sub_data = static_cast(subscription->data); + RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); + + if (sub_data->context->impl->is_shutdown) { + return RMW_RET_OK; + } + + // RETRIEVE SERIALIZED MESSAGE =============================================== + + std::unique_ptr msg_data = sub_data->pop_next_message(); + if (msg_data == nullptr) { + // This tells rcl that the check for a new message was done, but no messages have come in yet. + return RMW_RET_OK; + } + + if (serialized_message->buffer_capacity < msg_data->payload.payload.len) { + rmw_ret_t ret = + rmw_serialized_message_resize(serialized_message, msg_data->payload.payload.len); + if (ret != RMW_RET_OK) { + return ret; // Error message already set + } + } + serialized_message->buffer_length = msg_data->payload.payload.len; + memcpy( + serialized_message->buffer, msg_data->payload.payload.start, + msg_data->payload.payload.len); + + *taken = true; + + // TODO(clalancette): fill in source_timestamp + message_info->source_timestamp = 0; + message_info->received_timestamp = msg_data->recv_timestamp; + // TODO(clalancette): fill in publication_sequence_number + message_info->publication_sequence_number = 0; + // TODO(clalancette): fill in reception_sequence_number + message_info->reception_sequence_number = 0; + message_info->publisher_gid.implementation_identifier = rmw_zenoh_identifier; + memcpy(message_info->publisher_gid.data, msg_data->publisher_gid, 16); + message_info->from_intra_process = false; + + return RMW_RET_OK; +} + //============================================================================== /// Take an incoming ROS message as a byte stream. rmw_ret_t @@ -1643,11 +1706,11 @@ rmw_take_serialized_message( bool * taken, rmw_subscription_allocation_t * allocation) { - static_cast(subscription); - static_cast(serialized_message); - static_cast(taken); static_cast(allocation); - return RMW_RET_UNSUPPORTED; + + rmw_message_info_t dummy_msg_info; + + return __rmw_take_serialized(subscription, serialized_message, taken, &dummy_msg_info); } //============================================================================== @@ -1660,12 +1723,9 @@ rmw_take_serialized_message_with_info( rmw_message_info_t * message_info, rmw_subscription_allocation_t * allocation) { - static_cast(subscription); - static_cast(serialized_message); - static_cast(taken); - static_cast(message_info); static_cast(allocation); - return RMW_RET_UNSUPPORTED; + + return __rmw_take_serialized(subscription, serialized_message, taken, message_info); } //==============================================================================