Skip to content

Commit

Permalink
Reuse methods and make publish_serialized_message also thread safe
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Sep 30, 2024
1 parent 0712429 commit 6f0186d
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 154 deletions.
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ add_library(rmw_zenoh_cpp SHARED
src/detail/type_support_common.cpp
src/detail/zenoh_config.cpp
src/detail/zenoh_router_check.cpp
src/detail/zenoh_utils.cpp
src/rmw_event.cpp
src/rmw_get_network_flow_endpoints.cpp
src/rmw_get_node_info_and_types.cpp
Expand Down
111 changes: 51 additions & 60 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "message_type_support.hpp"
#include "logging_macros.hpp"
#include "qos.hpp"
#include "zenoh_utils.hpp"

#include "rcpputils/scope_exit.hpp"

Expand All @@ -36,50 +37,6 @@

namespace rmw_zenoh_cpp
{
namespace
{
z_owned_bytes_map_t
create_map_and_set_sequence_num(int64_t sequence_number, uint8_t gid[RMW_GID_STORAGE_SIZE])
{
z_owned_bytes_map_t map = z_bytes_map_new();
if (!z_check(map)) {
RMW_SET_ERROR_MSG("failed to allocate map for sequence number");
return z_bytes_map_null();
}
auto free_attachment_map = rcpputils::make_scope_exit(
[&map]() {
z_bytes_map_drop(z_move(map));
});

// The largest possible int64_t number is INT64_MAX, i.e. 9223372036854775807.
// That is 19 characters long, plus one for the trailing \0, means we need 20 bytes.
char seq_id_str[20];
if (rcutils_snprintf(seq_id_str, sizeof(seq_id_str), "%" PRId64, sequence_number) < 0) {
RMW_SET_ERROR_MSG("failed to print sequence_number into buffer");
return z_bytes_map_null();
}
z_bytes_map_insert_by_copy(&map, z_bytes_new("sequence_number"), z_bytes_new(seq_id_str));

auto now = std::chrono::system_clock::now().time_since_epoch();
auto now_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(now);
char source_ts_str[20];
if (rcutils_snprintf(source_ts_str, sizeof(source_ts_str), "%" PRId64, now_ns.count()) < 0) {
RMW_SET_ERROR_MSG("failed to print sequence_number into buffer");
return z_bytes_map_null();
}
z_bytes_map_insert_by_copy(&map, z_bytes_new("source_timestamp"), z_bytes_new(source_ts_str));

z_bytes_t gid_bytes;
gid_bytes.len = RMW_GID_STORAGE_SIZE;
gid_bytes.start = gid;

z_bytes_map_insert_by_copy(&map, z_bytes_new("source_gid"), gid_bytes);

free_attachment_map.cancel();

return map;
}
} // namespace
///=============================================================================
std::shared_ptr<PublisherData> PublisherData::make(
z_session_t session,
Expand Down Expand Up @@ -332,10 +289,8 @@ rmw_ret_t PublisherData::publish(

const size_t data_length = ser.get_serialized_data_length();

const int64_t sequence_number = sequence_number_++;

z_owned_bytes_map_t map =
create_map_and_set_sequence_num(sequence_number, gid_);
create_map_and_set_sequence_num(sequence_number_++, gid_);
if (!z_check(map)) {
// create_map_and_set_sequence_num already set the error
return RMW_RET_ERROR;
Expand Down Expand Up @@ -373,24 +328,67 @@ rmw_ret_t PublisherData::publish(
}

///=============================================================================
std::size_t PublisherData::guid() const
rmw_ret_t PublisherData::publish_serialized_message(
const rmw_serialized_message_t * serialized_message,
std::optional<zc_owned_shm_manager_t> & /*shm_manager*/)
{
eprosima::fastcdr::FastBuffer buffer(
reinterpret_cast<char *>(serialized_message->buffer), serialized_message->buffer_length);
rmw_zenoh_cpp::Cdr ser(buffer);
if (!ser.get_cdr().jump(serialized_message->buffer_length)) {
RMW_SET_ERROR_MSG("cannot correctly set serialized buffer");
return RMW_RET_ERROR;
}

std::lock_guard<std::mutex> lock(mutex_);
return entity_->guid();

z_owned_bytes_map_t map =
rmw_zenoh_cpp::create_map_and_set_sequence_num(sequence_number_++, gid_);

if (!z_check(map)) {
// create_map_and_set_sequence_num already set the error
return RMW_RET_ERROR;
}
auto free_attachment_map = rcpputils::make_scope_exit(
[&map]() {
z_bytes_map_drop(z_move(map));
});

const size_t data_length = ser.get_serialized_data_length();

// The encoding is simply forwarded and is useful when key expressions in the
// session use different encoding formats. In our case, all key expressions
// will be encoded with CDR so it does not really matter.
z_publisher_put_options_t options = z_publisher_put_options_default();
options.attachment = z_bytes_map_as_attachment(&map);

// Returns 0 if success.
int8_t ret = z_publisher_put(
z_loan(pub_),
serialized_message->buffer,
data_length,
&options);

if (ret) {
RMW_SET_ERROR_MSG("unable to publish message");
return RMW_RET_ERROR;
}

return RMW_RET_OK;
}

///=============================================================================
liveliness::TopicInfo PublisherData::topic_info() const
std::size_t PublisherData::guid() const
{
std::lock_guard<std::mutex> lock(mutex_);
return entity_->topic_info().value();
return entity_->guid();
}

///=============================================================================
size_t PublisherData::get_next_sequence_number()
liveliness::TopicInfo PublisherData::topic_info() const
{
std::lock_guard<std::mutex> lock(mutex_);
return sequence_number_++;
return entity_->topic_info().value();
}

///=============================================================================
Expand All @@ -400,13 +398,6 @@ const uint8_t * PublisherData::gid() const
return gid_;
}

///=============================================================================
z_publisher_t PublisherData::publisher() const
{
std::lock_guard<std::mutex> lock(mutex_);
return z_loan(pub_);
}

///=============================================================================
bool PublisherData::liveliness_is_valid() const
{
Expand Down
9 changes: 4 additions & 5 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ class PublisherData final
const void * ros_message,
std::optional<zc_owned_shm_manager_t> & shm_manager);

// Get the next sequence number.
size_t get_next_sequence_number();
// Publish a serialized ROS message.
rmw_ret_t publish_serialized_message(
const rmw_serialized_message_t * serialized_message,
std::optional<zc_owned_shm_manager_t> & shm_manager);

// Get a copy of the GUID of this PublisherData's liveliness::Entity.
std::size_t guid() const;
Expand All @@ -71,9 +73,6 @@ class PublisherData final
// Get the GID of this PublisherData.
const uint8_t * gid() const;

// Get the Zenoh publisher.
z_publisher_t publisher() const;

// Returns true if liveliness token is still valid.
bool liveliness_is_valid() const;

Expand Down
68 changes: 68 additions & 0 deletions rmw_zenoh_cpp/src/detail/zenoh_utils.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2024 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "zenoh_utils.hpp"

#include <chrono>
#include <cinttypes>

#include "rcpputils/scope_exit.hpp"

#include "rmw/error_handling.h"

namespace rmw_zenoh_cpp
{
///=============================================================================
z_owned_bytes_map_t
create_map_and_set_sequence_num(int64_t sequence_number, const uint8_t gid[RMW_GID_STORAGE_SIZE])
{
z_owned_bytes_map_t map = z_bytes_map_new();
if (!z_check(map)) {
RMW_SET_ERROR_MSG("failed to allocate map for sequence number");
return z_bytes_map_null();
}
auto free_attachment_map = rcpputils::make_scope_exit(
[&map]() {
z_bytes_map_drop(z_move(map));
});

// The largest possible int64_t number is INT64_MAX, i.e. 9223372036854775807.
// That is 19 characters long, plus one for the trailing \0, means we need 20 bytes.
char seq_id_str[20];
if (rcutils_snprintf(seq_id_str, sizeof(seq_id_str), "%" PRId64, sequence_number) < 0) {
RMW_SET_ERROR_MSG("failed to print sequence_number into buffer");
return z_bytes_map_null();
}
z_bytes_map_insert_by_copy(&map, z_bytes_new("sequence_number"), z_bytes_new(seq_id_str));

auto now = std::chrono::system_clock::now().time_since_epoch();
auto now_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(now);
char source_ts_str[20];
if (rcutils_snprintf(source_ts_str, sizeof(source_ts_str), "%" PRId64, now_ns.count()) < 0) {
RMW_SET_ERROR_MSG("failed to print sequence_number into buffer");
return z_bytes_map_null();
}
z_bytes_map_insert_by_copy(&map, z_bytes_new("source_timestamp"), z_bytes_new(source_ts_str));

z_bytes_t gid_bytes;
gid_bytes.len = RMW_GID_STORAGE_SIZE;
gid_bytes.start = gid;

z_bytes_map_insert_by_copy(&map, z_bytes_new("source_gid"), gid_bytes);

free_attachment_map.cancel();

return map;
}
} // namespace rmw_zenoh_cpp
30 changes: 30 additions & 0 deletions rmw_zenoh_cpp/src/detail/zenoh_utils.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2024 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef DETAIL__ZENOH_UTILS_HPP_
#define DETAIL__ZENOH_UTILS_HPP_

#include <zenoh.h>

#include "rmw/types.h"

namespace rmw_zenoh_cpp
{
///=============================================================================
z_owned_bytes_map_t
create_map_and_set_sequence_num(int64_t sequence_number, const uint8_t gid[RMW_GID_STORAGE_SIZE]);

} // namespace rmw_zenoh_cpp

#endif // DETAIL__ZENOH_UTILS_HPP_
Loading

0 comments on commit 6f0186d

Please sign in to comment.