diff --git a/rmw_zenoh_cpp/src/detail/buffer_pool.hpp b/rmw_zenoh_cpp/src/detail/buffer_pool.hpp new file mode 100644 index 00000000..ce45f71a --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/buffer_pool.hpp @@ -0,0 +1,86 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__BUFFER_POOL_HPP_ +#define DETAIL__BUFFER_POOL_HPP_ + +#include +#include +#include + +#include "rcutils/allocator.h" + +class BufferPool +{ +public: + BufferPool() = default; + + uint8_t * allocate(rcutils_allocator_t *allocator, size_t size) + { + // FIXME(fuzzypixelz): indeed, this methods leaks all allocated buffers ;) + std::lock_guard guard(mutex_); + + if (available_buffers_.empty()) { + uint8_t * data = static_cast(allocator->allocate(size, allocator->state)); + if (data == nullptr) { + return nullptr; + } + Buffer buffer; + buffer.data = data; + buffer.size = size; + buffers_.push_back(buffer); + return data; + } else { + size_t available_buffer = available_buffers_.back(); + Buffer & buffer = buffers_.at(available_buffer); + if (buffer.size < size) { + uint8_t * data = static_cast(allocator->reallocate( + buffer.data, size, allocator->state)); + if (data == nullptr) { + return nullptr; + } + buffer.data = data; + buffer.size = size; + } + available_buffers_.pop_back(); + return buffer.data; + } + } + + void + deallocate(uint8_t *data) + { + std::lock_guard guard(mutex_); + + for (size_t i = 0; i < buffers_.size(); i++) { + if (buffers_.at(i).data == data) { + available_buffers_.push_back(i); + return; + } + } + } + +private: + struct Buffer + { + uint8_t *data; + size_t size; + }; + + std::vector buffers_; + std::vector available_buffers_; + std::mutex mutex_; +}; + +#endif // DETAIL__BUFFER_POOL_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index ee30b3e0..1d444915 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -24,6 +24,7 @@ #include "graph_cache.hpp" #include "rmw_node_data.hpp" +#include "buffer_pool.hpp" #include "rmw/ret_types.h" #include "rmw/types.h" @@ -92,6 +93,9 @@ struct rmw_context_impl_s final // Forward declaration class Data; + // Pool of serialization buffers. + BufferPool serialization_buffer_pool; + private: std::shared_ptr data_{nullptr}; }; diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index f54c50ab..625e5905 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -227,6 +227,12 @@ PublisherData::PublisherData( events_mgr_ = std::make_shared(); } +void delete_z_bytes(void *data, void *context) +{ + BufferPool *pool = reinterpret_cast(context); + pool->deallocate(static_cast(data)); +} + ///============================================================================= rmw_ret_t PublisherData::publish( const void * ros_message, @@ -244,7 +250,7 @@ rmw_ret_t PublisherData::publish( type_support_impl_); // To store serialized message byte array. - char * msg_bytes = nullptr; + uint8_t * msg_bytes = nullptr; std::optional shmbuf = std::nullopt; auto always_free_shmbuf = rcpputils::make_scope_exit( [&shmbuf]() { @@ -254,13 +260,7 @@ rmw_ret_t PublisherData::publish( }); rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator; - - auto always_free_msg_bytes = rcpputils::make_scope_exit( - [&msg_bytes, allocator, &shmbuf]() { - if (msg_bytes && !shmbuf.has_value()) { - allocator->deallocate(msg_bytes, allocator->state); - } - }); + rmw_context_impl_s * context_impl = static_cast(rmw_node_->data); // Get memory from SHM buffer if available. if (shm_provider.has_value()) { @@ -274,21 +274,21 @@ rmw_ret_t PublisherData::publish( if (alloc.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) { shmbuf = std::make_optional(alloc.buf); - msg_bytes = reinterpret_cast(z_shm_mut_data_mut(z_loan_mut(alloc.buf))); + msg_bytes = reinterpret_cast(z_shm_mut_data_mut(z_loan_mut(alloc.buf))); } else { // TODO(Yadunund): Should we revert to regular allocation and not return an error? RMW_SET_ERROR_MSG("Failed to allocate a SHM buffer, even after GCing."); return RMW_RET_ERROR; } } else { - // Get memory from the allocator. - msg_bytes = static_cast(allocator->allocate(max_data_length, allocator->state)); + // Get memory from the buffer pool. + msg_bytes = context_impl->serialization_buffer_pool.allocate(allocator, max_data_length); RMW_CHECK_FOR_NULL_WITH_MSG( msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC); } // Object that manages the raw buffer - eprosima::fastcdr::FastBuffer fastbuffer(msg_bytes, max_data_length); + eprosima::fastcdr::FastBuffer fastbuffer(reinterpret_cast(msg_bytes), max_data_length); // Object that serializes the data rmw_zenoh_cpp::Cdr ser(fastbuffer); @@ -318,7 +318,8 @@ rmw_ret_t PublisherData::publish( if (shmbuf.has_value()) { z_bytes_from_shm_mut(&payload, z_move(shmbuf.value())); } else { - z_bytes_copy_from_buf(&payload, reinterpret_cast(msg_bytes), data_length); + z_bytes_from_buf(&payload, msg_bytes, data_length, delete_z_bytes, + reinterpret_cast(&context_impl->serialization_buffer_pool)); } z_result_t res = z_publisher_put(z_loan(pub_), z_move(payload), &options);