Skip to content

Commit

Permalink
Don't copy contiguous bytes on reception.
Browse files Browse the repository at this point in the history
This uses the slices iterator API of zenoh-c to avoid unecessarily
copying bytes into an owned slice, if and only if the bytes is made
up of exactly one slice.
  • Loading branch information
fuzzypixelz committed Dec 16, 2024
1 parent dd82e84 commit a4be550
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 19 deletions.
92 changes: 92 additions & 0 deletions rmw_zenoh_cpp/src/detail/payload.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2024 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef DETAIL__PAYLOAD_HPP_
#define DETAIL__PAYLOAD_HPP_

#include <zenoh.h>

#include <variant>
#include <utility>

namespace rmw_zenoh_cpp
{
///=============================================================================
class Payload
{
public:
explicit Payload(const z_loaned_bytes_t *bytes)
{
z_bytes_slice_iterator_t slices = z_bytes_get_slice_iterator(bytes);
z_view_slice_t view;
z_bytes_slice_iterator_next(&slices, &view);
if (!z_bytes_slice_iterator_next(&slices, &view)) {
z_owned_bytes_t owned_bytes;
z_bytes_clone(&owned_bytes, bytes);
Contiguous slice;
slice.view = view;
slice.owned_bytes = owned_bytes;
bytes_ = slice;
} else {
z_owned_slice_t slice;
z_bytes_to_slice(bytes, &slice);
bytes_ = slice;
}
}

~Payload()
{
if (std::holds_alternative<NonContiguous>(bytes_)) {
z_drop(z_move(std::get<NonContiguous>(bytes_)));
} else {
z_drop(z_move(std::get<Contiguous>(bytes_).owned_bytes));
}
}

const uint8_t * data()
{
if (std::holds_alternative<NonContiguous>(bytes_)) {
z_owned_slice_t owned = std::get<NonContiguous>(bytes_);
return z_slice_data(z_loan(owned));
} else {
z_view_slice_t view = std::get<Contiguous>(bytes_).view;
return z_slice_data(z_loan(view));
}
}

size_t size()
{
if (std::holds_alternative<NonContiguous>(bytes_)) {
z_owned_slice_t owned = std::get<NonContiguous>(bytes_);
return z_slice_len(z_loan(owned));
} else {
z_view_slice_t view = std::get<Contiguous>(bytes_).view;
return z_slice_len(z_loan(view));
}
}

private:
struct Contiguous
{
z_view_slice_t view;
z_owned_bytes_t owned_bytes;
};
using NonContiguous = z_owned_slice_t;
// Is `z_owned_slice_t` in case of a non-contiguous `bytes`
// and `z_view_slice_t` plus a `z_owned_bytes_t` otherwise.
std::variant<NonContiguous, Contiguous> bytes_;
};
} // namespace rmw_zenoh_cpp

#endif // DETAIL__PAYLOAD_HPP_
23 changes: 7 additions & 16 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,11 @@ void sub_data_handler(z_loaned_sample_t * sample, void * data)
AttachmentData attachment(z_sample_attachment(sample));
const z_loaned_bytes_t * payload = z_sample_payload(sample);

z_owned_slice_t slice;
z_bytes_to_slice(payload, &slice);

std::string topic_name(z_string_data(z_loan(keystr)), z_string_len(z_loan(keystr)));

sub_data->add_new_message(
std::make_unique<SubscriptionData::Message>(
slice,
payload,
std::chrono::system_clock::now().time_since_epoch().count(),
std::move(attachment)),
topic_name);
Expand All @@ -79,17 +76,11 @@ void sub_data_handler(z_loaned_sample_t * sample, void * data)

///=============================================================================
SubscriptionData::Message::Message(
z_owned_slice_t p,
const z_loaned_bytes_t * bytes,
uint64_t recv_ts,
AttachmentData && attachment_)
: payload(p), recv_timestamp(recv_ts), attachment(std::move(attachment_))
{
}

///=============================================================================
SubscriptionData::Message::~Message()
: payload(Payload(bytes)), recv_timestamp(recv_ts), attachment(std::move(attachment_))
{
z_drop(z_move(payload));
}

///=============================================================================
Expand Down Expand Up @@ -479,8 +470,8 @@ rmw_ret_t SubscriptionData::take_one_message(
std::unique_ptr<Message> msg_data = std::move(message_queue_.front());
message_queue_.pop_front();

const uint8_t * payload = z_slice_data(z_loan(msg_data->payload));
const size_t payload_len = z_slice_len(z_loan(msg_data->payload));
const uint8_t * payload = msg_data->payload.data();
const size_t payload_len = msg_data->payload.size();

// Object that manages the raw buffer
eprosima::fastcdr::FastBuffer fastbuffer(
Expand Down Expand Up @@ -530,8 +521,8 @@ rmw_ret_t SubscriptionData::take_serialized_message(
std::unique_ptr<Message> msg_data = std::move(message_queue_.front());
message_queue_.pop_front();

const uint8_t * payload = z_slice_data(z_loan(msg_data->payload));
const size_t payload_len = z_slice_len(z_loan(msg_data->payload));
const uint8_t * payload = msg_data->payload.data();
const size_t payload_len = msg_data->payload.size();

if (serialized_message->buffer_capacity < payload_len) {
rmw_ret_t ret =
Expand Down
7 changes: 4 additions & 3 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "attachment_helpers.hpp"
#include "type_support_common.hpp"
#include "zenoh_utils.hpp"
#include "payload.hpp"

#include "rcutils/allocator.h"

Expand All @@ -50,13 +51,13 @@ class SubscriptionData final : public std::enable_shared_from_this<SubscriptionD
struct Message
{
explicit Message(
z_owned_slice_t p,
const z_loaned_bytes_t * bytes,
uint64_t recv_ts,
AttachmentData && attachment);

~Message();
~Message() = default;

z_owned_slice_t payload;
Payload payload;
uint64_t recv_timestamp;
AttachmentData attachment;
};
Expand Down

0 comments on commit a4be550

Please sign in to comment.