Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port zenoh_c to zenoh_cpp #327

Merged
merged 17 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ jobs:
with:
package-name: |
rmw_zenoh_cpp
zenoh_c_vendor
zenoh_cpp_vendor
target-ros2-distro: ${{ matrix.ROS_DISTRO }}
vcs-repo-file-url: ${{ matrix.BUILD_TYPE == 'source' && env.ROS2_REPOS_FILE_URL || '' }}
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Build `rmw_zenoh_cpp`

>Note: By default, we vendor and compile `zenoh-c` with a subset of `zenoh` features.
The `ZENOHC_CARGO_FLAGS` CMake argument may be overwritten with other features included if required.
See [zenoh_c_vendor/CMakeLists.txt](./zenoh_c_vendor/CMakeLists.txt) for more details.
See [zenoh_cpp_vendor/CMakeLists.txt](./zenoh_cpp_vendor/CMakeLists.txt) for more details.

```bash
mkdir ~/ws_rmw_zenoh/src -p && cd ~/ws_rmw_zenoh/src
Expand Down
7 changes: 3 additions & 4 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ find_package(rcutils REQUIRED)
find_package(rosidl_typesupport_fastrtps_c REQUIRED)
find_package(rosidl_typesupport_fastrtps_cpp REQUIRED)
find_package(rmw REQUIRED)
find_package(zenoh_c_vendor REQUIRED)
find_package(zenoh_cpp_vendor REQUIRED)

add_library(rmw_zenoh_cpp SHARED
src/detail/attachment_helpers.cpp
Expand All @@ -45,7 +45,6 @@ add_library(rmw_zenoh_cpp SHARED
src/detail/type_support.cpp
src/detail/type_support_common.cpp
src/detail/zenoh_config.cpp
src/detail/zenoh_router_check.cpp
src/detail/zenoh_utils.cpp
src/rmw_event.cpp
src/rmw_get_network_flow_endpoints.cpp
Expand All @@ -68,7 +67,7 @@ target_link_libraries(rmw_zenoh_cpp
rosidl_typesupport_fastrtps_c::rosidl_typesupport_fastrtps_c
rosidl_typesupport_fastrtps_cpp::rosidl_typesupport_fastrtps_cpp
rmw::rmw
zenohc::lib
zenohcxx::zenohc
)

configure_rmw_library(rmw_zenoh_cpp)
Expand Down Expand Up @@ -130,7 +129,7 @@ target_link_libraries(rmw_zenohd
rcutils::rcutils
rcpputils::rcpputils
rmw::rmw
zenohc::lib
zenohcxx::zenohc
)

install(
Expand Down
4 changes: 2 additions & 2 deletions rmw_zenoh_cpp/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

<buildtool_depend>ament_cmake</buildtool_depend>

<build_depend>zenoh_c_vendor</build_depend>
<build_export_depend>zenoh_c_vendor</build_export_depend>
<build_depend>zenoh_cpp_vendor</build_depend>
<build_export_depend>zenoh_cpp_vendor</build_export_depend>

<depend>ament_index_cpp</depend>
<depend>fastcdr</depend>
Expand Down
122 changes: 47 additions & 75 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <zenoh.h>

#include <cstdlib>
#include <cstring>
#include <stdexcept>
#include <string>
#include <string_view>
#include <utility>
#include <vector>

#include <zenoh.hxx>

#include "rmw/types.h"

Expand All @@ -27,75 +29,24 @@

namespace rmw_zenoh_cpp
{
///=============================================================================

AttachmentData::AttachmentData(
const int64_t sequence_number,
const int64_t source_timestamp,
const uint8_t source_gid[RMW_GID_STORAGE_SIZE])
const std::vector<uint8_t> source_gid)
: sequence_number_(sequence_number),
source_timestamp_(source_timestamp)
source_timestamp_(source_timestamp),
source_gid_(source_gid),
gid_hash_(hash_gid(source_gid))
{
memcpy(source_gid_, source_gid, RMW_GID_STORAGE_SIZE);
gid_hash_ = hash_gid(source_gid_);
}

///=============================================================================
AttachmentData::AttachmentData(AttachmentData && data)
: sequence_number_(std::move(data.sequence_number_)),
source_timestamp_(std::move(data.source_timestamp_)),
gid_hash_(std::move(data.gid_hash_))
{
memcpy(source_gid_, data.source_gid_, RMW_GID_STORAGE_SIZE);
}

///=============================================================================
AttachmentData::AttachmentData(const z_loaned_bytes_t * attachment)
{
ze_deserializer_t deserializer = ze_deserializer_from_bytes(attachment);
z_owned_string_t key;

// Deserialize the sequence_number
ze_deserializer_deserialize_string(&deserializer, &key);
if (std::string_view(
z_string_data(z_loan(key)),
z_string_len(z_loan(key))) != "sequence_number")
{
throw std::runtime_error("sequence_number is not found in the attachment.");
}
z_drop(z_move(key));
if (ze_deserializer_deserialize_int64(&deserializer, &this->sequence_number_)) {
throw std::runtime_error("Failed to deserialize the sequence_number.");
}

// Deserialize the source_timestamp
ze_deserializer_deserialize_string(&deserializer, &key);
if (std::string_view(
z_string_data(z_loan(key)),
z_string_len(z_loan(key))) != "source_timestamp")
{
throw std::runtime_error("source_timestamp is not found in the attachment");
}
z_drop(z_move(key));
if (ze_deserializer_deserialize_int64(&deserializer, &this->source_timestamp_)) {
throw std::runtime_error("Failed to deserialize the source_timestamp.");
}

// Deserialize the source_gid
ze_deserializer_deserialize_string(&deserializer, &key);
if (std::string_view(z_string_data(z_loan(key)), z_string_len(z_loan(key))) != "source_gid") {
throw std::runtime_error("Invalid attachment: the key source_gid is not found");
}
z_drop(z_move(key));
z_owned_slice_t slice;
if (ze_deserializer_deserialize_slice(&deserializer, &slice)) {
throw std::runtime_error("Failed to deserialize the source_gid.");
}
if (z_slice_len(z_loan(slice)) != RMW_GID_STORAGE_SIZE) {
throw std::runtime_error("The length of source_gid mismatched.");
}
memcpy(this->source_gid_, z_slice_data(z_loan(slice)), z_slice_len(z_loan(slice)));
z_drop(z_move(slice));
gid_hash_ = hash_gid(this->source_gid_);
gid_hash_ = std::move(data.gid_hash_);
sequence_number_ = std::move(data.sequence_number_);
source_timestamp_ = std::move(data.source_timestamp_);
source_gid_ = data.source_gid_;
}

///=============================================================================
Expand All @@ -111,9 +62,9 @@ int64_t AttachmentData::source_timestamp() const
}

///=============================================================================
void AttachmentData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const
std::vector<uint8_t> AttachmentData::copy_gid() const
{
memcpy(out_gid, source_gid_, RMW_GID_STORAGE_SIZE);
return source_gid_;
}

///=============================================================================
Expand All @@ -122,17 +73,38 @@ size_t AttachmentData::gid_hash() const
return gid_hash_;
}

///=============================================================================
void AttachmentData::serialize_to_zbytes(z_owned_bytes_t * attachment)
zenoh::Bytes AttachmentData::serialize_to_zbytes()
{
auto serializer = zenoh::ext::Serializer();
serializer.serialize(std::string("sequence_number"));
serializer.serialize(this->sequence_number_);
serializer.serialize(std::string("source_timestamp"));
serializer.serialize(this->source_timestamp_);
serializer.serialize(std::string("source_gid"));
serializer.serialize(this->source_gid_);
return std::move(serializer).finish();
}

AttachmentData::AttachmentData(const zenoh::Bytes & bytes)
{
ze_owned_serializer_t serializer;
ze_serializer_empty(&serializer);
ze_serializer_serialize_str(z_loan_mut(serializer), "sequence_number");
ze_serializer_serialize_int64(z_loan_mut(serializer), this->sequence_number_);
ze_serializer_serialize_str(z_loan_mut(serializer), "source_timestamp");
ze_serializer_serialize_int64(z_loan_mut(serializer), this->source_timestamp_);
ze_serializer_serialize_str(z_loan_mut(serializer), "source_gid");
ze_serializer_serialize_buf(z_loan_mut(serializer), this->source_gid_, RMW_GID_STORAGE_SIZE);
ze_serializer_finish(z_move(serializer), attachment);
zenoh::ext::Deserializer deserializer(bytes);
const auto sequence_number_str = deserializer.deserialize<std::string>();
if (sequence_number_str != "sequence_number") {
throw std::runtime_error("sequence_number is not found in the attachment.");
}
this->sequence_number_ = deserializer.deserialize<int64_t>();

const auto source_timestamp_str = deserializer.deserialize<std::string>();
if (source_timestamp_str != "source_timestamp") {
throw std::runtime_error("source_timestamp is not found in the attachment.");
}
this->source_timestamp_ = deserializer.deserialize<int64_t>();

const auto source_gid_str = deserializer.deserialize<std::string>();
if (source_gid_str != "source_gid") {
throw std::runtime_error("source_gid is not found in the attachment.");
}
this->source_gid_ = deserializer.deserialize<std::vector<uint8_t>>();
gid_hash_ = hash_gid(this->source_gid_);
}
} // namespace rmw_zenoh_cpp
16 changes: 10 additions & 6 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
#ifndef DETAIL__ATTACHMENT_HELPERS_HPP_
#define DETAIL__ATTACHMENT_HELPERS_HPP_

#include <zenoh.h>
#include <cstdint>
#include <vector>

#include <zenoh.hxx>

#include "rmw/types.h"

Expand All @@ -28,21 +31,22 @@ class AttachmentData final
AttachmentData(
const int64_t sequence_number,
const int64_t source_timestamp,
const uint8_t source_gid[RMW_GID_STORAGE_SIZE]);
explicit AttachmentData(const z_loaned_bytes_t *);
const std::vector<uint8_t> source_gid);

explicit AttachmentData(const zenoh::Bytes & bytes);
clalancette marked this conversation as resolved.
Show resolved Hide resolved
explicit AttachmentData(AttachmentData && data);

int64_t sequence_number() const;
int64_t source_timestamp() const;
void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const;
std::vector<uint8_t> copy_gid() const;
size_t gid_hash() const;

void serialize_to_zbytes(z_owned_bytes_t *);
zenoh::Bytes serialize_to_zbytes();

private:
int64_t sequence_number_;
int64_t source_timestamp_;
uint8_t source_gid_[RMW_GID_STORAGE_SIZE];
std::vector<uint8_t> source_gid_;
size_t gid_hash_;
};
} // namespace rmw_zenoh_cpp
Expand Down
8 changes: 4 additions & 4 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ TopicData::TopicData(ConstEntityPtr entity)
}

///=============================================================================
GraphCache::GraphCache(const z_id_t & zid)
: zid_str_(liveliness::zid_to_str(zid))
GraphCache::GraphCache(const zenoh::Id & zid)
: zid_str_(zid.to_string())
{
// Do nothing.
}
Expand Down Expand Up @@ -1169,7 +1169,7 @@ rmw_ret_t GraphCache::get_entities_info_by_topic(
}

memset(ep.endpoint_gid, 0, RMW_GID_STORAGE_SIZE);
entity->copy_gid(ep.endpoint_gid);
memcpy(ep.endpoint_gid, entity->copy_gid().data(), RMW_GID_STORAGE_SIZE);

endpoints.push_back(ep);
}
Expand Down Expand Up @@ -1202,7 +1202,7 @@ rmw_ret_t GraphCache::service_server_is_available(
service_it->second.find(client_topic_info.type_);
if (type_it != service_it->second.end()) {
for (const auto & [_, topic_data] : type_it->second) {
if (topic_data->subs_.size() > 0) {
if (!topic_data->subs_.empty()) {
*is_available = true;
return RMW_RET_OK;
}
Expand Down
3 changes: 2 additions & 1 deletion rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "rmw/get_topic_endpoint_info.h"
#include "rmw/names_and_types.h"

#include <zenoh/api/id.hxx>

namespace rmw_zenoh_cpp
{
Expand Down Expand Up @@ -110,7 +111,7 @@ class GraphCache final
/// @param id The id of the zenoh session that is building the graph cache.
/// This is used to infer which entities originated from the current session
/// so that appropriate event callbacks may be triggered.
explicit GraphCache(const z_id_t & zid);
explicit GraphCache(const zenoh::Id & zid);

// Parse a PUT message over a token's key-expression and update the graph.
void parse_put(const std::string & keyexpr, bool ignore_from_current_session = false);
Expand Down
33 changes: 13 additions & 20 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include <utility>
#include <vector>

#include <zenoh.hxx>

#include "logging_macros.hpp"
#include "qos.hpp"
#include "simplified_xxhash3.hpp"
Expand Down Expand Up @@ -371,16 +373,6 @@ std::optional<rmw_qos_profile_t> keyexpr_to_qos(const std::string & keyexpr)
return qos;
}

///=============================================================================
std::string zid_to_str(const z_id_t & id)
{
z_owned_string_t z_str;
z_id_to_string(&id, &z_str);
std::string str(z_string_data(z_loan(z_str)), z_string_len(z_loan(z_str)));
z_drop(z_move(z_str));
return str;
}

///=============================================================================
std::string subscription_token(size_t domain_id)
{
Expand Down Expand Up @@ -445,16 +437,18 @@ Entity::Entity(
// returned to the RMW layer as necessary.
simplified_XXH128_hash_t keyexpr_gid =
simplified_XXH3_128bits(this->liveliness_keyexpr_.c_str(), this->liveliness_keyexpr_.length());
memcpy(this->gid_, &keyexpr_gid.low64, sizeof(keyexpr_gid.low64));
memcpy(this->gid_ + sizeof(keyexpr_gid.low64), &keyexpr_gid.high64, sizeof(keyexpr_gid.high64));
this->gid_.resize(RMW_GID_STORAGE_SIZE);
memcpy(this->gid_.data(), &keyexpr_gid.low64, sizeof(keyexpr_gid.low64));
memcpy(this->gid_.data() + sizeof(keyexpr_gid.low64), &keyexpr_gid.high64,
sizeof(keyexpr_gid.high64));

// We also hash the liveliness keyexpression into a size_t that we use to index into our maps.
this->keyexpr_hash_ = hash_gid(this->gid_);
}

///=============================================================================
std::shared_ptr<Entity> Entity::make(
z_id_t zid,
zenoh::Id zid,
const std::string & nid,
const std::string & id,
EntityType type,
Expand All @@ -480,7 +474,7 @@ std::shared_ptr<Entity> Entity::make(

return std::make_shared<Entity>(
Entity{
zid_to_str(zid),
std::string(zid.to_string()),
nid,
id,
std::move(type),
Expand Down Expand Up @@ -638,9 +632,9 @@ std::string Entity::liveliness_keyexpr() const
}

///=============================================================================
void Entity::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const
std::vector<uint8_t> Entity::copy_gid() const
{
memcpy(out_gid, gid_, RMW_GID_STORAGE_SIZE);
return gid_;
}

///=============================================================================
Expand Down Expand Up @@ -679,13 +673,12 @@ std::string demangle_name(const std::string & input)
} // namespace liveliness

///=============================================================================
size_t hash_gid(const uint8_t gid[RMW_GID_STORAGE_SIZE])
size_t hash_gid(const std::vector<uint8_t> gid)
clalancette marked this conversation as resolved.
Show resolved Hide resolved
{
std::stringstream hash_str;
hash_str << std::hex;
size_t i = 0;
for (; i < (RMW_GID_STORAGE_SIZE - 1); i++) {
hash_str << static_cast<int>(gid[i]);
for (const auto & g : gid) {
hash_str << static_cast<int>(g);
}
return std::hash<std::string>{}(hash_str.str());
}
Expand Down
Loading
Loading