diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml
index a66af168..4fa7b445 100644
--- a/.github/workflows/build.yaml
+++ b/.github/workflows/build.yaml
@@ -40,6 +40,6 @@ jobs:
with:
package-name: |
rmw_zenoh_cpp
- zenoh_c_vendor
+ zenoh_cpp_vendor
target-ros2-distro: ${{ matrix.ROS_DISTRO }}
vcs-repo-file-url: ${{ matrix.BUILD_TYPE == 'source' && env.ROS2_REPOS_FILE_URL || '' }}
diff --git a/README.md b/README.md
index 164fe0f8..0359f92b 100644
--- a/README.md
+++ b/README.md
@@ -19,7 +19,7 @@ Build `rmw_zenoh_cpp`
>Note: By default, we vendor and compile `zenoh-c` with a subset of `zenoh` features.
The `ZENOHC_CARGO_FLAGS` CMake argument may be overwritten with other features included if required.
-See [zenoh_c_vendor/CMakeLists.txt](./zenoh_c_vendor/CMakeLists.txt) for more details.
+See [zenoh_cpp_vendor/CMakeLists.txt](./zenoh_cpp_vendor/CMakeLists.txt) for more details.
```bash
mkdir ~/ws_rmw_zenoh/src -p && cd ~/ws_rmw_zenoh/src
diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt
index 89b5a598..185fa804 100644
--- a/rmw_zenoh_cpp/CMakeLists.txt
+++ b/rmw_zenoh_cpp/CMakeLists.txt
@@ -21,7 +21,7 @@ find_package(rcutils REQUIRED)
find_package(rosidl_typesupport_fastrtps_c REQUIRED)
find_package(rosidl_typesupport_fastrtps_cpp REQUIRED)
find_package(rmw REQUIRED)
-find_package(zenoh_c_vendor REQUIRED)
+find_package(zenoh_cpp_vendor REQUIRED)
add_library(rmw_zenoh_cpp SHARED
src/detail/attachment_helpers.cpp
@@ -45,7 +45,6 @@ add_library(rmw_zenoh_cpp SHARED
src/detail/type_support.cpp
src/detail/type_support_common.cpp
src/detail/zenoh_config.cpp
- src/detail/zenoh_router_check.cpp
src/detail/zenoh_utils.cpp
src/rmw_event.cpp
src/rmw_get_network_flow_endpoints.cpp
@@ -68,7 +67,7 @@ target_link_libraries(rmw_zenoh_cpp
rosidl_typesupport_fastrtps_c::rosidl_typesupport_fastrtps_c
rosidl_typesupport_fastrtps_cpp::rosidl_typesupport_fastrtps_cpp
rmw::rmw
- zenohc::lib
+ zenohcxx::zenohc
)
configure_rmw_library(rmw_zenoh_cpp)
@@ -130,7 +129,7 @@ target_link_libraries(rmw_zenohd
rcutils::rcutils
rcpputils::rcpputils
rmw::rmw
- zenohc::lib
+ zenohcxx::zenohc
)
install(
diff --git a/rmw_zenoh_cpp/package.xml b/rmw_zenoh_cpp/package.xml
index e1d2a24a..79272665 100644
--- a/rmw_zenoh_cpp/package.xml
+++ b/rmw_zenoh_cpp/package.xml
@@ -15,8 +15,8 @@
ament_cmake
- zenoh_c_vendor
- zenoh_c_vendor
+ zenoh_cpp_vendor
+ zenoh_cpp_vendor
ament_index_cpp
fastcdr
diff --git a/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp b/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp
index da2571ce..6f128b47 100644
--- a/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp
+++ b/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp
@@ -12,14 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-#include
-
+#include
#include
#include
#include
-#include
+#include
#include
+#include
+
#include "rmw/types.h"
#include "attachment_helpers.hpp"
@@ -27,75 +28,24 @@
namespace rmw_zenoh_cpp
{
-///=============================================================================
+
AttachmentData::AttachmentData(
const int64_t sequence_number,
const int64_t source_timestamp,
- const uint8_t source_gid[RMW_GID_STORAGE_SIZE])
+ const std::array source_gid)
: sequence_number_(sequence_number),
- source_timestamp_(source_timestamp)
+ source_timestamp_(source_timestamp),
+ source_gid_(source_gid),
+ gid_hash_(hash_gid(source_gid))
{
- memcpy(source_gid_, source_gid, RMW_GID_STORAGE_SIZE);
- gid_hash_ = hash_gid(source_gid_);
}
-///=============================================================================
AttachmentData::AttachmentData(AttachmentData && data)
-: sequence_number_(std::move(data.sequence_number_)),
- source_timestamp_(std::move(data.source_timestamp_)),
- gid_hash_(std::move(data.gid_hash_))
-{
- memcpy(source_gid_, data.source_gid_, RMW_GID_STORAGE_SIZE);
-}
-
-///=============================================================================
-AttachmentData::AttachmentData(const z_loaned_bytes_t * attachment)
{
- ze_deserializer_t deserializer = ze_deserializer_from_bytes(attachment);
- z_owned_string_t key;
-
- // Deserialize the sequence_number
- ze_deserializer_deserialize_string(&deserializer, &key);
- if (std::string_view(
- z_string_data(z_loan(key)),
- z_string_len(z_loan(key))) != "sequence_number")
- {
- throw std::runtime_error("sequence_number is not found in the attachment.");
- }
- z_drop(z_move(key));
- if (ze_deserializer_deserialize_int64(&deserializer, &this->sequence_number_)) {
- throw std::runtime_error("Failed to deserialize the sequence_number.");
- }
-
- // Deserialize the source_timestamp
- ze_deserializer_deserialize_string(&deserializer, &key);
- if (std::string_view(
- z_string_data(z_loan(key)),
- z_string_len(z_loan(key))) != "source_timestamp")
- {
- throw std::runtime_error("source_timestamp is not found in the attachment");
- }
- z_drop(z_move(key));
- if (ze_deserializer_deserialize_int64(&deserializer, &this->source_timestamp_)) {
- throw std::runtime_error("Failed to deserialize the source_timestamp.");
- }
-
- // Deserialize the source_gid
- ze_deserializer_deserialize_string(&deserializer, &key);
- if (std::string_view(z_string_data(z_loan(key)), z_string_len(z_loan(key))) != "source_gid") {
- throw std::runtime_error("Invalid attachment: the key source_gid is not found");
- }
- z_drop(z_move(key));
- z_owned_slice_t slice;
- if (ze_deserializer_deserialize_slice(&deserializer, &slice)) {
- throw std::runtime_error("Failed to deserialize the source_gid.");
- }
- if (z_slice_len(z_loan(slice)) != RMW_GID_STORAGE_SIZE) {
- throw std::runtime_error("The length of source_gid mismatched.");
- }
- memcpy(this->source_gid_, z_slice_data(z_loan(slice)), z_slice_len(z_loan(slice)));
- z_drop(z_move(slice));
- gid_hash_ = hash_gid(this->source_gid_);
+ gid_hash_ = std::move(data.gid_hash_);
+ sequence_number_ = std::move(data.sequence_number_);
+ source_timestamp_ = std::move(data.source_timestamp_);
+ source_gid_ = data.source_gid_;
}
///=============================================================================
@@ -111,9 +61,9 @@ int64_t AttachmentData::source_timestamp() const
}
///=============================================================================
-void AttachmentData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const
+std::array AttachmentData::copy_gid() const
{
- memcpy(out_gid, source_gid_, RMW_GID_STORAGE_SIZE);
+ return source_gid_;
}
///=============================================================================
@@ -122,17 +72,38 @@ size_t AttachmentData::gid_hash() const
return gid_hash_;
}
-///=============================================================================
-void AttachmentData::serialize_to_zbytes(z_owned_bytes_t * attachment)
+zenoh::Bytes AttachmentData::serialize_to_zbytes()
+{
+ auto serializer = zenoh::ext::Serializer();
+ serializer.serialize(std::string("sequence_number"));
+ serializer.serialize(this->sequence_number_);
+ serializer.serialize(std::string("source_timestamp"));
+ serializer.serialize(this->source_timestamp_);
+ serializer.serialize(std::string("source_gid"));
+ serializer.serialize(this->source_gid_);
+ return std::move(serializer).finish();
+}
+
+AttachmentData::AttachmentData(const zenoh::Bytes & bytes)
{
- ze_owned_serializer_t serializer;
- ze_serializer_empty(&serializer);
- ze_serializer_serialize_str(z_loan_mut(serializer), "sequence_number");
- ze_serializer_serialize_int64(z_loan_mut(serializer), this->sequence_number_);
- ze_serializer_serialize_str(z_loan_mut(serializer), "source_timestamp");
- ze_serializer_serialize_int64(z_loan_mut(serializer), this->source_timestamp_);
- ze_serializer_serialize_str(z_loan_mut(serializer), "source_gid");
- ze_serializer_serialize_buf(z_loan_mut(serializer), this->source_gid_, RMW_GID_STORAGE_SIZE);
- ze_serializer_finish(z_move(serializer), attachment);
+ zenoh::ext::Deserializer deserializer(bytes);
+ const auto sequence_number_str = deserializer.deserialize();
+ if (sequence_number_str != "sequence_number") {
+ throw std::runtime_error("sequence_number is not found in the attachment.");
+ }
+ this->sequence_number_ = deserializer.deserialize();
+
+ const auto source_timestamp_str = deserializer.deserialize();
+ if (source_timestamp_str != "source_timestamp") {
+ throw std::runtime_error("source_timestamp is not found in the attachment.");
+ }
+ this->source_timestamp_ = deserializer.deserialize();
+
+ const auto source_gid_str = deserializer.deserialize();
+ if (source_gid_str != "source_gid") {
+ throw std::runtime_error("source_gid is not found in the attachment.");
+ }
+ this->source_gid_ = deserializer.deserialize>();
+ gid_hash_ = hash_gid(this->source_gid_);
}
} // namespace rmw_zenoh_cpp
diff --git a/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp b/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp
index ec460620..64f444d1 100644
--- a/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp
+++ b/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp
@@ -15,7 +15,10 @@
#ifndef DETAIL__ATTACHMENT_HELPERS_HPP_
#define DETAIL__ATTACHMENT_HELPERS_HPP_
-#include
+#include
+#include
+
+#include
#include "rmw/types.h"
@@ -28,21 +31,22 @@ class AttachmentData final
AttachmentData(
const int64_t sequence_number,
const int64_t source_timestamp,
- const uint8_t source_gid[RMW_GID_STORAGE_SIZE]);
- explicit AttachmentData(const z_loaned_bytes_t *);
+ const std::array source_gid);
+
+ explicit AttachmentData(const zenoh::Bytes & bytes);
explicit AttachmentData(AttachmentData && data);
int64_t sequence_number() const;
int64_t source_timestamp() const;
- void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const;
+ std::array copy_gid() const;
size_t gid_hash() const;
- void serialize_to_zbytes(z_owned_bytes_t *);
+ zenoh::Bytes serialize_to_zbytes();
private:
int64_t sequence_number_;
int64_t source_timestamp_;
- uint8_t source_gid_[RMW_GID_STORAGE_SIZE];
+ std::array source_gid_;
size_t gid_hash_;
};
} // namespace rmw_zenoh_cpp
diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp
index 9401a5c8..a1af2d46 100644
--- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp
+++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp
@@ -71,8 +71,8 @@ TopicData::TopicData(ConstEntityPtr entity)
}
///=============================================================================
-GraphCache::GraphCache(const z_id_t & zid)
-: zid_str_(liveliness::zid_to_str(zid))
+GraphCache::GraphCache(const zenoh::Id & zid)
+: zid_str_(zid.to_string())
{
// Do nothing.
}
@@ -1168,8 +1168,7 @@ rmw_ret_t GraphCache::get_entities_info_by_topic(
}
}
- memset(ep.endpoint_gid, 0, RMW_GID_STORAGE_SIZE);
- entity->copy_gid(ep.endpoint_gid);
+ memcpy(ep.endpoint_gid, entity->copy_gid().data(), RMW_GID_STORAGE_SIZE);
endpoints.push_back(ep);
}
@@ -1202,7 +1201,7 @@ rmw_ret_t GraphCache::service_server_is_available(
service_it->second.find(client_topic_info.type_);
if (type_it != service_it->second.end()) {
for (const auto & [_, topic_data] : type_it->second) {
- if (topic_data->subs_.size() > 0) {
+ if (!topic_data->subs_.empty()) {
*is_available = true;
return RMW_RET_OK;
}
diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp
index 8ac2172d..6c4f6e54 100644
--- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp
+++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp
@@ -36,6 +36,7 @@
#include "rmw/get_topic_endpoint_info.h"
#include "rmw/names_and_types.h"
+#include
namespace rmw_zenoh_cpp
{
@@ -110,7 +111,7 @@ class GraphCache final
/// @param id The id of the zenoh session that is building the graph cache.
/// This is used to infer which entities originated from the current session
/// so that appropriate event callbacks may be triggered.
- explicit GraphCache(const z_id_t & zid);
+ explicit GraphCache(const zenoh::Id & zid);
// Parse a PUT message over a token's key-expression and update the graph.
void parse_put(const std::string & keyexpr, bool ignore_from_current_session = false);
diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
index 52c8ec7f..c1792091 100644
--- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
+++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
@@ -26,6 +26,8 @@
#include
#include
+#include
+
#include "logging_macros.hpp"
#include "qos.hpp"
#include "simplified_xxhash3.hpp"
@@ -371,16 +373,6 @@ std::optional keyexpr_to_qos(const std::string & keyexpr)
return qos;
}
-///=============================================================================
-std::string zid_to_str(const z_id_t & id)
-{
- z_owned_string_t z_str;
- z_id_to_string(&id, &z_str);
- std::string str(z_string_data(z_loan(z_str)), z_string_len(z_loan(z_str)));
- z_drop(z_move(z_str));
- return str;
-}
-
///=============================================================================
std::string subscription_token(size_t domain_id)
{
@@ -445,8 +437,9 @@ Entity::Entity(
// returned to the RMW layer as necessary.
simplified_XXH128_hash_t keyexpr_gid =
simplified_XXH3_128bits(this->liveliness_keyexpr_.c_str(), this->liveliness_keyexpr_.length());
- memcpy(this->gid_, &keyexpr_gid.low64, sizeof(keyexpr_gid.low64));
- memcpy(this->gid_ + sizeof(keyexpr_gid.low64), &keyexpr_gid.high64, sizeof(keyexpr_gid.high64));
+ memcpy(this->gid_.data(), &keyexpr_gid.low64, sizeof(keyexpr_gid.low64));
+ memcpy(this->gid_.data() + sizeof(keyexpr_gid.low64), &keyexpr_gid.high64,
+ sizeof(keyexpr_gid.high64));
// We also hash the liveliness keyexpression into a size_t that we use to index into our maps.
this->keyexpr_hash_ = hash_gid(this->gid_);
@@ -454,7 +447,7 @@ Entity::Entity(
///=============================================================================
std::shared_ptr Entity::make(
- z_id_t zid,
+ zenoh::Id zid,
const std::string & nid,
const std::string & id,
EntityType type,
@@ -480,7 +473,7 @@ std::shared_ptr Entity::make(
return std::make_shared(
Entity{
- zid_to_str(zid),
+ std::string(zid.to_string()),
nid,
id,
std::move(type),
@@ -638,9 +631,9 @@ std::string Entity::liveliness_keyexpr() const
}
///=============================================================================
-void Entity::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const
+std::array Entity::copy_gid() const
{
- memcpy(out_gid, gid_, RMW_GID_STORAGE_SIZE);
+ return gid_;
}
///=============================================================================
@@ -679,13 +672,12 @@ std::string demangle_name(const std::string & input)
} // namespace liveliness
///=============================================================================
-size_t hash_gid(const uint8_t gid[RMW_GID_STORAGE_SIZE])
+size_t hash_gid(const std::array gid)
{
std::stringstream hash_str;
hash_str << std::hex;
- size_t i = 0;
- for (; i < (RMW_GID_STORAGE_SIZE - 1); i++) {
- hash_str << static_cast(gid[i]);
+ for (const auto & g : gid) {
+ hash_str << static_cast(g);
}
return std::hash{}(hash_str.str());
}
diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp
index 895ae604..81880df6 100644
--- a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp
+++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp
@@ -15,8 +15,7 @@
#ifndef DETAIL__LIVELINESS_UTILS_HPP_
#define DETAIL__LIVELINESS_UTILS_HPP_
-#include
-
+#include
#include
#include
#include
@@ -25,6 +24,8 @@
#include
#include
+#include
+
#include "rmw/types.h"
namespace rmw_zenoh_cpp
@@ -127,7 +128,7 @@ class Entity
/// @param topic_info An optional topic information for relevant entities.
/// @return An entity if all inputs are valid. This way no invalid entities can be created.
static EntityPtr make(
- z_id_t zid,
+ zenoh::Id zid,
const std::string & nid,
const std::string & id,
EntityType type,
@@ -172,7 +173,7 @@ class Entity
// Two entities are equal if their keyexpr_hash are equal.
bool operator==(const Entity & other) const;
- void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const;
+ std::array copy_gid() const;
private:
Entity(
@@ -191,7 +192,7 @@ class Entity
NodeInfo node_info_;
std::optional topic_info_;
std::string liveliness_keyexpr_;
- uint8_t gid_[RMW_GID_STORAGE_SIZE];
+ std::array gid_{};
};
///=============================================================================
@@ -231,15 +232,10 @@ std::string qos_to_keyexpr(const rmw_qos_profile_t & qos);
///=============================================================================
/// Convert a rmw_qos_profile_t from a keyexpr. Return std::nullopt if invalid.
std::optional keyexpr_to_qos(const std::string & keyexpr);
-
-///=============================================================================
-/// Convert a Zenoh id to a string.
-std::string zid_to_str(const z_id_t & id);
} // namespace liveliness
///=============================================================================
-/// Generate a hash for a given GID.
-size_t hash_gid(const uint8_t gid[RMW_GID_STORAGE_SIZE]);
+size_t hash_gid(const std::array gid);
} // namespace rmw_zenoh_cpp
///=============================================================================
diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp
index 1236d453..1c24570d 100644
--- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp
+++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp
@@ -16,6 +16,7 @@
#include
+#include
#include
#include
#include
@@ -25,6 +26,9 @@
#include
#include
#include
+#include
+
+#include
#include "attachment_helpers.hpp"
#include "cdr.hpp"
@@ -40,82 +44,11 @@
#include "rmw/get_topic_endpoint_info.h"
#include "rmw/impl/cpp/macros.hpp"
-namespace
-{
-///=============================================================================
-struct ClientDataWrapper
-{
- explicit ClientDataWrapper(std::shared_ptr data)
- : client_data(std::move(data))
- {
- }
-
- std::shared_ptr client_data;
-};
-
-///=============================================================================
-void client_data_handler(z_loaned_reply_t * reply, void * data)
-{
- auto wrapper = static_cast(data);
- if (wrapper == nullptr) {
- RMW_ZENOH_LOG_ERROR_NAMED(
- "rmw_zenoh_cpp",
- "Unable to obtain client_data_t from data in client_data_handler."
- );
- return;
- }
-
- if (wrapper->client_data->is_shutdown()) {
- return;
- }
-
- if (!z_reply_is_ok(reply)) {
- const z_loaned_reply_err_t * err = z_reply_err(reply);
- const z_loaned_bytes_t * err_payload = z_reply_err_payload(err);
-
- z_owned_string_t err_str;
- z_bytes_to_string(err_payload, &err_str);
-
- RMW_ZENOH_LOG_ERROR_NAMED(
- "rmw_zenoh_cpp",
- "z_reply_is_ok returned False for keyexpr %s. Reason: %.*s",
- wrapper->client_data->topic_info().topic_keyexpr_.c_str(),
- static_cast(z_string_len(z_loan(err_str))),
- z_string_data(z_loan(err_str)));
- z_drop(z_move(err_str));
-
- return;
- }
-
- std::chrono::nanoseconds::rep received_timestamp =
- std::chrono::system_clock::now().time_since_epoch().count();
-
- wrapper->client_data->add_new_reply(
- std::make_unique(reply, received_timestamp));
-}
-
-///=============================================================================
-void client_data_drop(void * data)
-{
- auto wrapper = static_cast(data);
- if (wrapper == nullptr) {
- RMW_ZENOH_LOG_ERROR_NAMED(
- "rmw_zenoh_cpp",
- "Unable to obtain client_data_t from data in client_data_drop."
- );
- return;
- }
-
- delete wrapper;
-}
-
-} // namespace
-
namespace rmw_zenoh_cpp
{
///=============================================================================
std::shared_ptr ClientData::make(
- std::shared_ptr session,
+ std::shared_ptr session,
const rmw_node_t * const node,
const rmw_client_t * client,
liveliness::NodeInfo node_info,
@@ -177,7 +110,7 @@ std::shared_ptr ClientData::make(
std::size_t domain_id = node_info.domain_id_;
auto entity = liveliness::Entity::make(
- z_info_zid(session->loan()),
+ session->get_zid(),
std::to_string(node_id),
std::to_string(service_id),
liveliness::EntityType::Client,
@@ -209,11 +142,6 @@ std::shared_ptr ClientData::make(
response_type_support
});
- if (!client_data->init(session)) {
- // init() already set the error.
- return nullptr;
- }
-
return client_data;
}
@@ -222,7 +150,7 @@ ClientData::ClientData(
const rmw_node_t * rmw_node,
const rmw_client_t * rmw_client,
std::shared_ptr entity,
- std::shared_ptr sess,
+ std::shared_ptr sess,
const void * request_type_support_impl,
const void * response_type_support_impl,
std::shared_ptr request_type_support,
@@ -240,51 +168,22 @@ ClientData::ClientData(
is_shutdown_(false),
initialized_(false)
{
- // Do nothing.
-}
-
-///=============================================================================
-bool ClientData::init(std::shared_ptr session)
-{
- if (z_keyexpr_from_str(
- &this->keyexpr_,
- this->entity_->topic_info().value().topic_keyexpr_.c_str()) != Z_OK)
- {
- RMW_SET_ERROR_MSG("unable to create zenoh keyexpr.");
- return false;
- }
- auto free_ros_keyexpr = rcpputils::make_scope_exit(
- [this]() {
- z_drop(z_move(this->keyexpr_));
- });
-
+ std::string topic_keyexpr = this->entity_->topic_info().value().topic_keyexpr_;
+ keyexpr_ = zenoh::KeyExpr(topic_keyexpr);
std::string liveliness_keyexpr = this->entity_->liveliness_keyexpr();
- z_view_keyexpr_t liveliness_ke;
- z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str());
- if (z_liveliness_declare_token(
- session->loan(),
- &this->token_,
- z_loan(liveliness_ke),
- NULL
- ) != Z_OK)
- {
+ zenoh::ZResult result;
+ this->token_ = sess_->liveliness_declare_token(
+ zenoh::KeyExpr(liveliness_keyexpr),
+ zenoh::Session::LivelinessDeclarationOptions::create_default(),
+ &result);
+ if (result != Z_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to create liveliness token for the client.");
- return false;
+ throw std::runtime_error("Unable to create liveliness token for the client.");
}
- auto free_token = rcpputils::make_scope_exit(
- [this]() {
- z_drop(z_move(this->token_));
- });
- sess_ = session;
initialized_ = true;
-
- free_ros_keyexpr.cancel();
- free_token.cancel();
-
- return true;
}
///=============================================================================
@@ -305,10 +204,9 @@ bool ClientData::liveliness_is_valid() const
}
///=============================================================================
-void ClientData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const
+std::array ClientData::copy_gid() const
{
- std::lock_guard lock(mutex_);
- entity_->copy_gid(out_gid);
+ return entity_->copy_gid();
}
///=============================================================================
@@ -321,15 +219,12 @@ void ClientData::add_new_reply(std::unique_ptr reply)
reply_queue_.size() >= adapted_qos_profile.depth)
{
// Log warning if message is discarded due to hitting the queue depth
- z_view_string_t keystr;
- z_keyexpr_as_view_string(z_loan(keyexpr_), &keystr);
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Query queue depth of %ld reached, discarding oldest Query "
- "for client for %.*s",
+ "for client for %s",
adapted_qos_profile.depth,
- static_cast(z_string_len(z_loan(keystr))),
- z_string_data(z_loan(keystr)));
+ std::string(keyexpr_.value().as_string_view()).c_str());
reply_queue_.pop_front();
}
reply_queue_.emplace_back(std::move(reply));
@@ -359,18 +254,34 @@ rmw_ret_t ClientData::take_response(
std::unique_ptr latest_reply = std::move(reply_queue_.front());
reply_queue_.pop_front();
- if (!latest_reply->get_sample().has_value()) {
+ auto & reply = latest_reply->get_sample();
+
+ if (!reply.is_ok()) {
RMW_SET_ERROR_MSG("invalid reply sample");
return RMW_RET_ERROR;
}
- const z_loaned_sample_t * sample = latest_reply->get_sample().value();
+
+ const zenoh::Sample & sample = reply.get_ok();
// Object that manages the raw buffer
- z_owned_slice_t payload;
- z_bytes_to_slice(z_sample_payload(sample), &payload);
+ std::vector payload = sample.get_payload().as_vector();
+ if (payload.size() == 0) {
+ RMW_ZENOH_LOG_DEBUG_NAMED(
+ "rmw_zenoh_cpp",
+ "ClientData not able to get slice data");
+ return RMW_RET_ERROR;
+ }
+
+ // Fill in the request_header
+ if (!sample.get_attachment().has_value()) {
+ RMW_ZENOH_LOG_DEBUG_NAMED(
+ "rmw_zenoh_cpp",
+ "ClientData take_request attachment is empty");
+ return RMW_RET_ERROR;
+ }
+
eprosima::fastcdr::FastBuffer fastbuffer(
- reinterpret_cast(const_cast(z_slice_data(z_loan(payload)))),
- z_slice_len(z_loan(payload)));
+ reinterpret_cast(const_cast(payload.data())), payload.size());
// Object that serializes the data
rmw_zenoh_cpp::Cdr deser(fastbuffer);
@@ -383,8 +294,7 @@ rmw_ret_t ClientData::take_response(
return RMW_RET_ERROR;
}
- // Fill in the request_header
- AttachmentData attachment(z_sample_attachment(sample));
+ rmw_zenoh_cpp::AttachmentData attachment(std::move(sample.get_attachment().value().get()));
request_header->request_id.sequence_number = attachment.sequence_number();
if (request_header->request_id.sequence_number < 0) {
RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment");
@@ -395,10 +305,12 @@ rmw_ret_t ClientData::take_response(
RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment");
return RMW_RET_ERROR;
}
- attachment.copy_gid(request_header->request_id.writer_guid);
+ memcpy(
+ request_header->request_id.writer_guid,
+ attachment.copy_gid().data(),
+ RMW_GID_STORAGE_SIZE);
request_header->received_timestamp = latest_reply->get_received_timestamp();
- z_drop(z_move(payload));
*taken = true;
return RMW_RET_OK;
@@ -452,16 +364,9 @@ rmw_ret_t ClientData::send_request(
*sequence_id = sequence_number_++;
// Send request
- z_get_options_t opts;
- z_get_options_default(&opts);
- z_owned_bytes_t attachment;
- uint8_t local_gid[RMW_GID_STORAGE_SIZE];
- entity_->copy_gid(local_gid);
- create_map_and_set_sequence_num(
- &attachment, *sequence_id,
- local_gid);
- opts.attachment = z_move(attachment);
-
+ zenoh::Session::GetOptions opts = zenoh::Session::GetOptions::create_default();
+ std::array local_gid = entity_->copy_gid();
+ opts.attachment = rmw_zenoh_cpp::create_map_and_set_sequence_num(*sequence_id, local_gid);
opts.target = Z_QUERY_TARGET_ALL_COMPLETE;
// The default timeout for a z_get query is 10 seconds and if a response is not received within
// this window, the queryable will return an invalid reply. However, it is common for actions,
@@ -471,23 +376,57 @@ rmw_ret_t ClientData::send_request(
// Latest consolidation guarantees unicity of replies for the same key expression,
// which optimizes bandwidth. The default is "None", which imples replies may come in any order
// and any number.
- opts.consolidation = z_query_consolidation_latest();
- z_owned_bytes_t payload;
- z_bytes_copy_from_buf(
- &payload, reinterpret_cast(request_bytes), data_length);
- opts.payload = z_move(payload);
-
- // TODO(Yadunund): Once we switch to zenoh-cpp with lambda closures,
- // capture shared_from_this() instead of this.
- ClientDataWrapper * wrapper = new ClientDataWrapper(shared_from_this());
- z_owned_closure_reply_t zn_closure_reply;
- z_closure(&zn_closure_reply, client_data_handler, client_data_drop, wrapper);
- z_get(
- sess_->loan(),
- z_loan(keyexpr_), "",
- z_move(zn_closure_reply),
- &opts);
-
+ opts.consolidation = zenoh::ConsolidationMode::Z_CONSOLIDATION_MODE_NONE;
+
+ std::vector raw_bytes(
+ reinterpret_cast(request_bytes),
+ reinterpret_cast(request_bytes) + data_length);
+ opts.payload = zenoh::Bytes(std::move(raw_bytes));
+
+ std::weak_ptr client_data = shared_from_this();
+ zenoh::ZResult result;
+ std::string parameters;
+ context_impl->session()->get(
+ keyexpr_.value(),
+ parameters,
+ [client_data](const zenoh::Reply & reply) {
+ if (!reply.is_ok()) {
+ auto reply_err_str = reply.get_err().get_payload().as_string();
+ RMW_ZENOH_LOG_ERROR_NAMED(
+ "rmw_zenoh_cpp",
+ "z_reply_is_ok returned False Reason: %s",
+ reply_err_str.c_str())
+ return;
+ }
+ const zenoh::Sample & sample = reply.get_ok();
+
+ auto sub_data = client_data.lock();
+ if (sub_data == nullptr) {
+ RMW_ZENOH_LOG_ERROR_NAMED(
+ "rmw_zenoh_cpp",
+ "Unable to obtain ClientData from data for %s.",
+ std::string(sample.get_keyexpr().as_string_view()).c_str());
+ return;
+ }
+
+ if (sub_data->is_shutdown()) {
+ return;
+ }
+
+ std::chrono::time_point now = std::chrono::system_clock::now();
+
+ sub_data->add_new_reply(
+ std::make_unique(reply, now.time_since_epoch().count()));
+ },
+ zenoh::closures::none,
+ std::move(opts),
+ &result);
+ if (result != Z_OK) {
+ RMW_ZENOH_LOG_DEBUG_NAMED(
+ "rmw_zenoh_cpp",
+ "ClientData unable to call get");
+ return RMW_RET_ERROR;
+ }
return RMW_RET_OK;
}
@@ -545,8 +484,14 @@ rmw_ret_t ClientData::shutdown()
// Unregister this node from the ROS graph.
if (initialized_) {
- z_liveliness_undeclare_token(z_move(token_));
- z_drop(z_move(keyexpr_));
+ zenoh::ZResult result;
+ std::move(token_).value().undeclare(&result);
+ if (result != Z_OK) {
+ RMW_ZENOH_LOG_ERROR_NAMED(
+ "rmw_zenoh_cpp",
+ "Unable to undeclare liveliness token");
+ return RMW_RET_ERROR;
+ }
}
sess_.reset();
diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp
index f582f539..44b187ba 100644
--- a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp
+++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp
@@ -15,8 +15,7 @@
#ifndef DETAIL__RMW_CLIENT_DATA_HPP_
#define DETAIL__RMW_CLIENT_DATA_HPP_
-#include
-
+#include
#include
#include
#include
@@ -26,6 +25,8 @@
#include
#include
+#include
+
#include "event.hpp"
#include "liveliness_utils.hpp"
#include "message_type_support.hpp"
@@ -47,7 +48,7 @@ class ClientData final : public std::enable_shared_from_this
public:
// Make a shared_ptr of ClientData.
static std::shared_ptr make(
- std::shared_ptr session,
+ std::shared_ptr session,
const rmw_node_t * const node,
const rmw_client_t * client,
liveliness::NodeInfo node_info,
@@ -64,7 +65,7 @@ class ClientData final : public std::enable_shared_from_this
bool liveliness_is_valid() const;
// Copy the GID of this ClientData into an rmw_gid_t.
- void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const;
+ std::array copy_gid() const;
// Add a new ZenohReply to the queue.
void add_new_reply(std::unique_ptr reply);
@@ -107,15 +108,12 @@ class ClientData final : public std::enable_shared_from_this
const rmw_node_t * rmw_node,
const rmw_client_t * client,
std::shared_ptr entity,
- std::shared_ptr sess,
+ std::shared_ptr sess,
const void * request_type_support_impl,
const void * response_type_support_impl,
std::shared_ptr request_type_support,
std::shared_ptr response_type_support);
- // Initialize the Zenoh objects for this entity.
- bool init(std::shared_ptr session);
-
// Internal mutex.
mutable std::recursive_mutex mutex_;
// The parent node.
@@ -124,11 +122,11 @@ class ClientData final : public std::enable_shared_from_this
// The Entity generated for the service.
std::shared_ptr entity_;
// A shared session.
- std::shared_ptr sess_;
+ std::shared_ptr sess_;
// An owned keyexpression.
- z_owned_keyexpr_t keyexpr_;
+ std::optional keyexpr_;
// Liveliness token for the service.
- z_owned_liveliness_token_t token_;
+ std::optional token_;
// Type support fields.
const void * request_type_support_impl_;
const void * response_type_support_impl_;
diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
index e800ea1f..206d2d3f 100644
--- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
+++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
@@ -26,13 +26,14 @@
#include
#include
+#include
+
#include "graph_cache.hpp"
#include "guard_condition.hpp"
#include "identifier.hpp"
#include "logging_macros.hpp"
#include "rmw_node_data.hpp"
#include "zenoh_config.hpp"
-#include "zenoh_router_check.hpp"
#include "rcpputils/scope_exit.hpp"
#include "rmw/error_handling.h"
@@ -49,8 +50,6 @@ static std::mutex data_to_data_shared_ptr_map_mutex;
static std::unordered_map> data_to_data_shared_ptr_map;
-static void graph_sub_data_handler(z_loaned_sample_t * sample, void * data);
-
// Bundle all class members into a data struct which can be passed as a
// weak ptr to various threads for thread-safe access without capturing
// "this" ptr by reference.
@@ -68,49 +67,49 @@ class rmw_context_impl_s::Data final
nodes_({})
{
// Initialize the zenoh configuration.
- z_owned_config_t config;
- rmw_ret_t ret;
- if ((ret =
- rmw_zenoh_cpp::get_z_config(
- rmw_zenoh_cpp::ConfigurableEntity::Session,
- &config)) != RMW_RET_OK)
- {
+ std::optional config = rmw_zenoh_cpp::get_z_config(
+ rmw_zenoh_cpp::ConfigurableEntity::Session);
+
+ if (!config.has_value()) {
throw std::runtime_error("Error configuring Zenoh session.");
}
+ zenoh::ZResult result;
+
#ifndef _MSC_VER
// Check if shm is enabled.
- z_owned_string_t shm_enabled;
- zc_config_get_from_str(z_loan(config), Z_CONFIG_SHARED_MEMORY_KEY, &shm_enabled);
- auto always_free_shm_enabled = rcpputils::make_scope_exit(
- [&shm_enabled]() {
- z_drop(z_move(shm_enabled));
- });
+ std::string shm_enabled = config.value().get(Z_CONFIG_SHARED_MEMORY_KEY, &result);
+ if (result != Z_OK) {
+ RMW_ZENOH_LOG_ERROR_NAMED(
+ "rmw_zenoh_cpp",
+ "Not able to get %s from the config file",
+ Z_CONFIG_SHARED_MEMORY_KEY);
+ }
#endif
// Initialize the zenoh session.
- z_owned_session_t raw_session;
- if (z_open(&raw_session, z_move(config), NULL) != Z_OK) {
- RMW_SET_ERROR_MSG("Error setting up zenoh session.");
- throw std::runtime_error("Error setting up zenoh session.");
- }
- if (session_ != nullptr) {
- session_.reset();
+ session_ = std::make_shared(
+ std::move(config.value()),
+ zenoh::Session::SessionOptions::create_default(),
+ &result);
+ if (result != Z_OK) {
+ throw std::runtime_error("Error setting up zenoh session. ");
}
- session_ = std::make_shared(raw_session);
- auto close_session = rcpputils::make_scope_exit(
- [&raw_session]() {
- z_close(z_loan_mut(raw_session), NULL);
- });
// Verify if the zenoh router is running if configured.
const std::optional configured_connection_attempts =
rmw_zenoh_cpp::zenoh_router_check_attempts();
if (configured_connection_attempts.has_value()) {
uint64_t connection_attempts = 0;
+ // Retry until the connection is successful.
constexpr std::chrono::milliseconds sleep_time(1000);
constexpr int64_t ticks_between_print(std::chrono::milliseconds(1000) / sleep_time);
- while ((ret = rmw_zenoh_cpp::zenoh_router_check(session_->loan())) != RMW_RET_OK) {
+ do {
+ zenoh::ZResult result;
+ this->session_->get_routers_z_id(&result);
+ if (result == Z_OK) {
+ break;
+ }
if ((connection_attempts % ticks_between_print) == 0) {
RMW_ZENOH_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
@@ -121,12 +120,12 @@ class rmw_context_impl_s::Data final
break;
}
std::this_thread::sleep_for(sleep_time);
- }
+ } while(connection_attempts < configured_connection_attempts.value());
}
// Initialize the graph cache.
- const z_id_t zid = z_info_zid(session_->loan());
- graph_cache_ = std::make_shared(zid);
+ graph_cache_ =
+ std::make_shared(this->session_->get_zid());
// Setup liveliness subscriptions for discovery.
std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token(domain_id);
@@ -145,59 +144,51 @@ class rmw_context_impl_s::Data final
// the code will be simpler, and if we're just going to spin over the non-blocking
// reads until we obtain responses, we'll just be hogging CPU time by convincing
// the OS that we're doing actual work when it could instead park the thread.
- z_owned_fifo_handler_reply_t handler;
- z_owned_closure_reply_t closure;
- z_fifo_channel_reply_new(&closure, &handler, SIZE_MAX - 1);
-
- z_view_keyexpr_t keyexpr;
- z_view_keyexpr_from_str(&keyexpr, liveliness_str.c_str());
- z_liveliness_get(
- session_->loan(), z_loan(keyexpr),
- z_move(closure), NULL);
- z_owned_reply_t reply;
- while (z_recv(z_loan(handler), &reply) == Z_OK) {
- if (z_reply_is_ok(z_loan(reply))) {
- const z_loaned_sample_t * sample = z_reply_ok(z_loan(reply));
- z_view_string_t keystr;
- z_keyexpr_as_view_string(z_sample_keyexpr(sample), &keystr);
- std::string liveliness_str(z_string_data(z_loan(keystr)), z_string_len(z_loan(keystr)));
- // Ignore tokens from the same session to avoid race conditions from this
- // query and the liveliness subscription.
- graph_cache_->parse_put(std::move(liveliness_str), true);
+
+ // Intuitively use a FIFO channel rather than building it up with a closure and a
+ // handler to FIFO channel
+ zenoh::KeyExpr keyexpr(liveliness_str);
+
+ zenoh::Session::GetOptions options = zenoh::Session::GetOptions::create_default();
+ options.target = zenoh::QueryTarget::Z_QUERY_TARGET_ALL;
+ options.payload = "";
+
+ auto replies = session_->liveliness_get(
+ keyexpr,
+ zenoh::channels::FifoChannel(SIZE_MAX - 1),
+ zenoh::Session::LivelinessGetOptions::create_default(),
+ &result);
+
+ if (result != Z_OK) {
+ throw std::runtime_error("Error getting liveliness. ");
+ }
+
+ for (auto res = replies.recv(); std::holds_alternative(res);
+ res = replies.recv())
+ {
+ const zenoh::Reply & reply = std::get(res);
+ if (reply.is_ok()) {
+ const auto & sample = reply.get_ok();
+ graph_cache_->parse_put(std::string(sample.get_keyexpr().as_string_view()), true);
} else {
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp", "[rmw_context_impl_s] z_call received an invalid reply.\n");
}
- z_drop(z_move(reply));
}
- z_drop(z_move(handler));
// Initialize the shm manager if shared_memory is enabled in the config.
shm_provider_ = std::nullopt;
#ifndef _MSC_VER
- if (strncmp(
- z_string_data(z_loan(shm_enabled)),
- "true",
- z_string_len(z_loan(shm_enabled))) == 0)
- {
- // TODO(yuyuan): determine the default alignment of SHM
- z_alloc_alignment_t alignment = {5};
- z_owned_memory_layout_t layout;
- z_memory_layout_new(&layout, SHM_BUFFER_SIZE_MB * 1024 * 1024, alignment);
-
- z_owned_shm_provider_t provider;
- if (z_posix_shm_provider_new(&provider, z_loan(layout)) != Z_OK) {
- RMW_ZENOH_LOG_ERROR_NAMED("rmw_zenoh_cpp", "Unable to create an SHM provider.");
- throw std::runtime_error("Unable to create an SHM provider.");
+ if (shm_enabled == "true") {
+ auto layout = zenoh::MemoryLayout(
+ SHM_BUFFER_SIZE_MB * 1024 * 1024,
+ zenoh::AllocAlignment({5}));
+ zenoh::PosixShmProvider provider(layout, &result);
+ if (result != Z_OK) {
+ throw std::runtime_error("Unable to create shm provider.");
}
- shm_provider_ = provider;
+ shm_provider_ = std::move(provider);
}
- auto free_shm_provider = rcpputils::make_scope_exit(
- [this]() {
- if (shm_provider_.has_value()) {
- z_drop(z_move(shm_provider_.value()));
- }
- });
#endif
graph_guard_condition_ = std::make_unique();
graph_guard_condition_->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier;
@@ -205,30 +196,37 @@ class rmw_context_impl_s::Data final
// Setup the liveliness subscriber to receives updates from the ROS graph
// and update the graph cache.
- z_liveliness_subscriber_options_t sub_options;
- z_liveliness_subscriber_options_default(&sub_options);
- z_owned_closure_sample_t callback;
- z_closure(&callback, graph_sub_data_handler, nullptr, this);
- z_view_keyexpr_t liveliness_ke;
- z_view_keyexpr_from_str(&liveliness_ke, liveliness_str.c_str());
- if (z_liveliness_declare_subscriber(
- session_->loan(),
- &graph_subscriber_, z_loan(liveliness_ke),
- z_move(callback), &sub_options) != Z_OK)
- {
+ zenoh::KeyExpr keyexpr_cpp(liveliness_str.c_str());
+ zenoh::Session::LivelinessSubscriberOptions sub_options =
+ zenoh::Session::LivelinessSubscriberOptions::create_default();
+ sub_options.history = true;
+ graph_subscriber_ = session_->liveliness_declare_subscriber(
+ keyexpr_cpp,
+ [&](const zenoh::Sample & sample) {
+ // Look up the data shared_ptr in the global map. If it is in there, use it.
+ // If not, it is being shutdown so we can just ignore this update.
+ std::shared_ptr data_shared_ptr{nullptr};
+ {
+ std::lock_guard lk(data_to_data_shared_ptr_map_mutex);
+ if (data_to_data_shared_ptr_map.count(this) == 0) {
+ return;
+ }
+ data_shared_ptr = data_to_data_shared_ptr_map[this];
+ }
+
+ // Update the graph cache.
+ data_shared_ptr->update_graph_cache(
+ sample,
+ std::string(sample.get_keyexpr().as_string_view()));
+ },
+ zenoh::closures::none,
+ std::move(sub_options),
+ &result);
+
+ if (result != Z_OK) {
RMW_SET_ERROR_MSG("unable to create zenoh subscription");
throw std::runtime_error("Unable to subscribe to ROS graph updates.");
}
- auto undeclare_z_sub = rcpputils::make_scope_exit(
- [this]() {
- z_undeclare_subscriber(z_move(this->graph_subscriber_));
- });
-
- close_session.cancel();
-#ifndef _MSC_VER
- free_shm_provider.cancel();
-#endif
- undeclare_z_sub.cancel();
}
// Shutdown the Zenoh session.
@@ -241,10 +239,15 @@ class rmw_context_impl_s::Data final
return ret;
}
- z_undeclare_subscriber(z_move(graph_subscriber_));
- if (shm_provider_.has_value()) {
- z_drop(z_move(shm_provider_.value()));
+ zenoh::ZResult result;
+ std::move(graph_subscriber_).value().undeclare(&result);
+ if (result != Z_OK) {
+ RMW_ZENOH_LOG_ERROR_NAMED(
+ "rmw_zenoh_cpp",
+ "Unable to undeclare liveliness token");
+ return RMW_RET_ERROR;
}
+
is_shutdown_ = true;
// We specifically do *not* hold the mutex_ while tearing down the session; this allows us
@@ -263,13 +266,13 @@ class rmw_context_impl_s::Data final
return enclave_;
}
- std::shared_ptr session() const
+ const std::shared_ptr session() const
{
std::lock_guard lock(mutex_);
return session_;
}
- std::optional & shm_provider()
+ std::optional & shm_provider()
{
std::lock_guard lock(mutex_);
return shm_provider_;
@@ -296,7 +299,7 @@ class rmw_context_impl_s::Data final
bool session_is_valid() const
{
std::lock_guard lock(mutex_);
- return !z_session_is_closed(session_->loan());
+ return !session_->is_closed();
}
std::shared_ptr graph_cache()
@@ -317,7 +320,7 @@ class rmw_context_impl_s::Data final
}
// Check that the Zenoh session is still valid.
- if (z_session_is_closed(session_->loan())) {
+ if (!session_is_valid()) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to create NodeData as Zenoh session is invalid.");
@@ -327,7 +330,7 @@ class rmw_context_impl_s::Data final
auto node_data = rmw_zenoh_cpp::NodeData::make(
node,
this->get_next_entity_id(),
- session_->loan(),
+ session(),
domain_id_,
ns,
node_name,
@@ -361,17 +364,17 @@ class rmw_context_impl_s::Data final
nodes_.erase(node);
}
- void update_graph_cache(z_sample_kind_t sample_kind, const std::string & keystr)
+ void update_graph_cache(const zenoh::Sample & sample_kind, const std::string & keystr)
{
std::lock_guard lock(mutex_);
if (is_shutdown_) {
return;
}
- switch (sample_kind) {
- case z_sample_kind_t::Z_SAMPLE_KIND_PUT:
+ switch (sample_kind.get_kind()) {
+ case zenoh::SampleKind::Z_SAMPLE_KIND_PUT:
graph_cache_->parse_put(keystr);
break;
- case z_sample_kind_t::Z_SAMPLE_KIND_DELETE:
+ case zenoh::SampleKind::Z_SAMPLE_KIND_DELETE:
graph_cache_->parse_del(keystr);
break;
default:
@@ -383,7 +386,7 @@ class rmw_context_impl_s::Data final
if (RMW_RET_OK != rmw_ret) {
RMW_ZENOH_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
- "[graph_sub_data_handler] Unable to trigger graph guard condition."
+ "[update_graph_cache] Unable to trigger graph guard condition."
);
}
}
@@ -403,15 +406,24 @@ class rmw_context_impl_s::Data final
std::size_t domain_id_;
// Enclave, name used to find security artifacts in a sros2 keystore.
std::string enclave_;
- // A shared session.
- std::shared_ptr session_{nullptr};
+ // An owned session.
+ std::shared_ptr session_;
// An optional SHM manager that is initialized of SHM is enabled in the
// zenoh session config.
- std::optional shm_provider_;
+ std::optional shm_provider_;
// Graph cache.
std::shared_ptr graph_cache_;
// ROS graph liveliness subscriber.
- z_owned_subscriber_t graph_subscriber_;
+ // The graph_subscriber *must* exist in order for anything in this Data class,
+ // and hence rmw_zenoh_cpp, to work.
+ // However, zenoh::Subscriber does not have an empty constructor,
+ // so just declaring this as a zenoh::Subscriber fails to compile.
+ // We work around that by wrapping it in a std::optional,
+ // so the std::optional gets constructed at Data constructor time,
+ // and then we initialize graph_subscriber_ later. Note that the zenoh-cpp API
+ // liveliness_declare_subscriber() throws an exception if it fails,
+ // so this should all be safe to do.
+ std::optional> graph_subscriber_;
// Equivalent to rmw_dds_common::Context's guard condition.
// Guard condition that should be triggered when the graph changes.
std::unique_ptr graph_guard_condition_;
@@ -425,37 +437,6 @@ class rmw_context_impl_s::Data final
std::unordered_map> nodes_;
};
-///=============================================================================
-static void graph_sub_data_handler(z_loaned_sample_t * sample, void * data)
-{
- z_view_string_t keystr;
- z_keyexpr_as_view_string(z_sample_keyexpr(sample), &keystr);
-
- auto data_ptr = static_cast(data);
- if (data_ptr == nullptr) {
- RMW_ZENOH_LOG_ERROR_NAMED(
- "rmw_zenoh_cpp",
- "[graph_sub_data_handler] Invalid data_ptr."
- );
- return;
- }
-
- // Look up the data shared_ptr in the global map. If it is in there, use it.
- // If not, it is being shutdown so we can just ignore this update.
- std::shared_ptr data_shared_ptr{nullptr};
- {
- std::lock_guard lk(data_to_data_shared_ptr_map_mutex);
- if (data_to_data_shared_ptr_map.count(data_ptr) == 0) {
- return;
- }
- data_shared_ptr = data_to_data_shared_ptr_map[data_ptr];
- }
-
- // Update the graph cache.
- std::string liveliness_str(z_string_data(z_loan(keystr)), z_string_len(z_loan(keystr)));
- data_shared_ptr->update_graph_cache(z_sample_kind(sample), std::move(liveliness_str));
-}
-
///=============================================================================
rmw_context_impl_s::rmw_context_impl_s(
const std::size_t domain_id,
@@ -480,13 +461,13 @@ std::string rmw_context_impl_s::enclave() const
}
///=============================================================================
-std::shared_ptr rmw_context_impl_s::session() const
+const std::shared_ptr rmw_context_impl_s::session() const
{
return data_->session();
}
///=============================================================================
-std::optional & rmw_context_impl_s::shm_provider()
+std::optional & rmw_context_impl_s::shm_provider()
{
return data_->shm_provider();
}
diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp
index ee30b3e0..a2fdaf5e 100644
--- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp
+++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp
@@ -15,13 +15,13 @@
#ifndef DETAIL__RMW_CONTEXT_IMPL_S_HPP_
#define DETAIL__RMW_CONTEXT_IMPL_S_HPP_
-#include
-
#include
#include
#include
#include
+#include
+
#include "graph_cache.hpp"
#include "rmw_node_data.hpp"
@@ -47,14 +47,14 @@ struct rmw_context_impl_s final
// Get a copy of the enclave.
std::string enclave() const;
- // Share the Zenoh session.
- std::shared_ptr session() const;
+ // Loan the Zenoh session.
+ const std::shared_ptr session() const;
// Get a reference to the shm_provider.
// Note: This is not thread-safe.
// TODO(Yadunund): Remove this API and instead include a publish() API
// that handles the shm_provider once the context manages publishers.
- std::optional & shm_provider();
+ std::optional & shm_provider();
// Get the graph guard condition.
rmw_guard_condition_t * graph_guard_condition();
diff --git a/rmw_zenoh_cpp/src/detail/rmw_init_options_impl.hpp b/rmw_zenoh_cpp/src/detail/rmw_init_options_impl.hpp
deleted file mode 100644
index 1574abc5..00000000
--- a/rmw_zenoh_cpp/src/detail/rmw_init_options_impl.hpp
+++ /dev/null
@@ -1,30 +0,0 @@
-// Copyright 2023 Open Source Robotics Foundation, Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#ifndef DETAIL__RMW_INIT_OPTIONS_IMPL_HPP_
-#define DETAIL__RMW_INIT_OPTIONS_IMPL_HPP_
-
-#include
-
-namespace rmw_zenoh_cpp
-{
-// TODO(YV): Consider using this again.
-struct rmw_init_options_impl_s
-{
- // An owned config.
- z_owned_config_t config;
-};
-} // namespace rmw_zenoh_cpp
-
-#endif // DETAIL__RMW_INIT_OPTIONS_IMPL_HPP_
diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp
index c61daa2a..ed1b5b2b 100644
--- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp
+++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp
@@ -30,7 +30,7 @@ namespace rmw_zenoh_cpp
std::shared_ptr NodeData::make(
const rmw_node_t * const node,
std::size_t id,
- const z_loaned_session_t * session,
+ std::shared_ptr session,
std::size_t domain_id,
const std::string & namespace_,
const std::string & node_name,
@@ -38,7 +38,7 @@ std::shared_ptr NodeData::make(
{
// Create the entity.
auto entity = rmw_zenoh_cpp::liveliness::Entity::make(
- z_info_zid(session),
+ session->get_zid(),
std::to_string(id),
std::to_string(id),
rmw_zenoh_cpp::liveliness::EntityType::Node,
@@ -58,21 +58,18 @@ std::shared_ptr NodeData::make(
// Create the liveliness token.
std::string liveliness_keyexpr = entity->liveliness_keyexpr();
- z_view_keyexpr_t liveliness_ke;
- z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str());
- z_owned_liveliness_token_t token;
- if (z_liveliness_declare_token(session, &token, z_loan(liveliness_ke), NULL) != Z_OK) {
+ zenoh::ZResult result;
+ auto token = session->liveliness_declare_token(
+ zenoh::KeyExpr(liveliness_keyexpr),
+ zenoh::Session::LivelinessDeclarationOptions::create_default(),
+ &result);
+ if (result != Z_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to create liveliness token for the node.");
return nullptr;
}
- auto free_token = rcpputils::make_scope_exit(
- [&token]() {
- z_drop(z_move(token));
- });
- free_token.cancel();
return std::shared_ptr(
new NodeData{
node,
@@ -87,7 +84,7 @@ NodeData::NodeData(
const rmw_node_t * const node,
std::size_t id,
std::shared_ptr entity,
- z_owned_liveliness_token_t token)
+ zenoh::LivelinessToken token)
: node_(node),
id_(std::move(id)),
entity_(std::move(entity)),
@@ -121,7 +118,7 @@ std::size_t NodeData::id() const
///=============================================================================
bool NodeData::create_pub_data(
const rmw_publisher_t * const publisher,
- std::shared_ptr session,
+ std::shared_ptr session,
std::size_t id,
const std::string & topic_name,
const rosidl_message_type_support_t * type_support,
@@ -143,7 +140,7 @@ bool NodeData::create_pub_data(
}
auto pub_data = PublisherData::make(
- std::move(session),
+ session,
node_,
entity_->node_info(),
id_,
@@ -187,7 +184,7 @@ void NodeData::delete_pub_data(const rmw_publisher_t * const publisher)
///=============================================================================
bool NodeData::create_sub_data(
const rmw_subscription_t * const subscription,
- std::shared_ptr session,
+ std::shared_ptr session,
std::shared_ptr graph_cache,
std::size_t id,
const std::string & topic_name,
@@ -210,7 +207,7 @@ bool NodeData::create_sub_data(
}
auto sub_data = SubscriptionData::make(
- std::move(session),
+ session,
std::move(graph_cache),
node_,
entity_->node_info(),
@@ -255,7 +252,7 @@ void NodeData::delete_sub_data(const rmw_subscription_t * const subscription)
///=============================================================================
bool NodeData::create_service_data(
const rmw_service_t * const service,
- std::shared_ptr session,
+ std::shared_ptr session,
std::size_t id,
const std::string & service_name,
const rosidl_service_type_support_t * type_supports,
@@ -277,7 +274,7 @@ bool NodeData::create_service_data(
}
auto service_data = ServiceData::make(
- std::move(session),
+ session,
node_,
entity_->node_info(),
id_,
@@ -322,7 +319,7 @@ void NodeData::delete_service_data(const rmw_service_t * const service)
///=============================================================================
bool NodeData::create_client_data(
const rmw_client_t * const client,
- std::shared_ptr session,
+ std::shared_ptr session,
std::size_t id,
const std::string & service_name,
const rosidl_service_type_support_t * type_supports,
@@ -344,7 +341,7 @@ bool NodeData::create_client_data(
}
auto client_data = ClientData::make(
- std::move(session),
+ session,
node_,
client,
entity_->node_info(),
@@ -396,7 +393,14 @@ rmw_ret_t NodeData::shutdown()
}
// Unregister this node from the ROS graph.
- z_liveliness_undeclare_token(z_move(token_));
+ zenoh::ZResult result;
+ std::move(token_).value().undeclare(&result);
+ if (result != Z_OK) {
+ RMW_ZENOH_LOG_ERROR_NAMED(
+ "rmw_zenoh_cpp",
+ "Unable to undeclare liveliness token");
+ return RMW_RET_ERROR;
+ }
is_shutdown_ = true;
return ret;
diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp
index 4429b589..e6fbede5 100644
--- a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp
+++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp
@@ -15,14 +15,14 @@
#ifndef DETAIL__RMW_NODE_DATA_HPP_
#define DETAIL__RMW_NODE_DATA_HPP_
-#include
-
#include
#include
#include
#include
#include
+#include
+
#include "graph_cache.hpp"
#include "liveliness_utils.hpp"
#include "rmw_client_data.hpp"
@@ -43,7 +43,7 @@ class NodeData final
static std::shared_ptr make(
const rmw_node_t * const node,
std::size_t id,
- const z_loaned_session_t * session,
+ std::shared_ptr session,
std::size_t domain_id,
const std::string & namespace_,
const std::string & node_name,
@@ -55,7 +55,7 @@ class NodeData final
// Create a new PublisherData for a given rmw_publisher_t.
bool create_pub_data(
const rmw_publisher_t * const publisher,
- std::shared_ptr sess,
+ std::shared_ptr session,
std::size_t id,
const std::string & topic_name,
const rosidl_message_type_support_t * type_support,
@@ -70,7 +70,7 @@ class NodeData final
// Create a new SubscriptionData for a given rmw_subscription_t.
bool create_sub_data(
const rmw_subscription_t * const subscription,
- std::shared_ptr sess,
+ std::shared_ptr session,
std::shared_ptr graph_cache,
std::size_t id,
const std::string & topic_name,
@@ -86,7 +86,7 @@ class NodeData final
// Create a new ServiceData for a given rmw_service_t.
bool create_service_data(
const rmw_service_t * const service,
- std::shared_ptr session,
+ std::shared_ptr session,
std::size_t id,
const std::string & service_name,
const rosidl_service_type_support_t * type_support,
@@ -101,7 +101,7 @@ class NodeData final
// Create a new ClientData for a given rmw_client_t.
bool create_client_data(
const rmw_client_t * const client,
- std::shared_ptr session,
+ std::shared_ptr session,
std::size_t id,
const std::string & service_name,
const rosidl_service_type_support_t * type_support,
@@ -128,7 +128,7 @@ class NodeData final
const rmw_node_t * const node,
std::size_t id,
std::shared_ptr entity,
- z_owned_liveliness_token_t token);
+ zenoh::LivelinessToken token);
// Internal mutex.
mutable std::recursive_mutex mutex_;
// The rmw_node_t associated with this NodeData.
@@ -139,7 +139,7 @@ class NodeData final
// The Entity generated for the node.
std::shared_ptr entity_;
// Liveliness token for the node.
- z_owned_liveliness_token_t token_;
+ std::optional token_;
// Shutdown flag.
bool is_shutdown_;
// Map of publishers.
diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
index f54c50ab..b228c7cb 100644
--- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
+++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
@@ -16,11 +16,13 @@
#include
+#include
#include
#include
#include
#include
#include
+#include
#include "cdr.hpp"
#include "rmw_context_impl_s.hpp"
@@ -42,7 +44,7 @@ namespace rmw_zenoh_cpp
///=============================================================================
std::shared_ptr PublisherData::make(
- std::shared_ptr session,
+ std::shared_ptr session,
const rmw_node_t * const node,
liveliness::NodeInfo node_info,
std::size_t node_id,
@@ -82,7 +84,7 @@ std::shared_ptr PublisherData::make(
std::size_t domain_id = node_info.domain_id_;
auto entity = liveliness::Entity::make(
- z_info_zid(session->loan()),
+ session->get_zid(),
std::to_string(node_id),
std::to_string(publisher_id),
liveliness::EntityType::Publisher,
@@ -102,52 +104,31 @@ std::shared_ptr PublisherData::make(
return nullptr;
}
- std::string topic_keyexpr = entity->topic_info()->topic_keyexpr_;
- z_view_keyexpr_t pub_ke;
- if (z_view_keyexpr_from_str(&pub_ke, topic_keyexpr.c_str()) != Z_OK) {
- RMW_SET_ERROR_MSG("unable to create zenoh keyexpr.");
- return nullptr;
- }
-
+ zenoh::ZResult result;
+ std::optional pub_cache;
+ zenoh::KeyExpr pub_ke(entity->topic_info()->topic_keyexpr_);
// Create a Publication Cache if durability is transient_local.
- std::optional pub_cache = std::nullopt;
if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) {
- ze_publication_cache_options_t pub_cache_opts;
- ze_publication_cache_options_default(&pub_cache_opts);
+ zenoh::Session::PublicationCacheOptions pub_cache_opts =
+ zenoh::Session::PublicationCacheOptions::create_default();
+
pub_cache_opts.history = adapted_qos_profile.depth;
pub_cache_opts.queryable_complete = true;
- // Set the queryable_prefix to the session id so that querying subscribers can specify this
- // session id to obtain latest data from this specific publication caches when querying over
- // the same keyexpression.
- // When such a prefix is added to the PublicationCache, it listens to queries with this extra
- // prefix (allowing to be queried in a unique way), but still replies with the original
- // publications' key expressions.
+
std::string queryable_prefix = entity->zid();
- z_view_keyexpr_t prefix_ke;
- z_view_keyexpr_from_str(&prefix_ke, queryable_prefix.c_str());
- pub_cache_opts.queryable_prefix = z_loan(prefix_ke);
-
- ze_owned_publication_cache_t pub_cache_;
- if (ze_declare_publication_cache(
- session->loan(), &pub_cache_, z_loan(pub_ke), &pub_cache_opts))
- {
+ pub_cache_opts.queryable_prefix = zenoh::KeyExpr(queryable_prefix);
+
+ pub_cache = session->declare_publication_cache(pub_ke, std::move(pub_cache_opts), &result);
+
+ if (result != Z_OK) {
RMW_SET_ERROR_MSG("unable to create zenoh publisher cache");
return nullptr;
}
- pub_cache = pub_cache_;
}
- auto undeclare_z_publisher_cache = rcpputils::make_scope_exit(
- [&pub_cache]() {
- if (pub_cache.has_value()) {
- z_drop(z_move(pub_cache.value()));
- }
- });
// Set congestion_control to BLOCK if appropriate.
- z_publisher_options_t opts;
- z_publisher_options_default(&opts);
+ zenoh::Session::PublisherOptions opts = zenoh::Session::PublisherOptions::create_default();
opts.congestion_control = Z_CONGESTION_CONTROL_DROP;
-
if (adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) {
opts.reliability = Z_RELIABILITY_RELIABLE;
@@ -155,40 +136,24 @@ std::shared_ptr PublisherData::make(
opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK;
}
}
- z_owned_publisher_t pub;
- // TODO(clalancette): What happens if the key name is a valid but empty string?
- if (z_declare_publisher(
- session->loan(), &pub, z_loan(pub_ke), &opts) != Z_OK)
- {
+ auto pub = session->declare_publisher(pub_ke, std::move(opts), &result);
+
+ if (result != Z_OK) {
RMW_SET_ERROR_MSG("Unable to create Zenoh publisher.");
return nullptr;
}
- auto undeclare_z_publisher = rcpputils::make_scope_exit(
- [&pub]() {
- z_undeclare_publisher(z_move(pub));
- });
std::string liveliness_keyexpr = entity->liveliness_keyexpr();
- z_view_keyexpr_t liveliness_ke;
- z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str());
- z_owned_liveliness_token_t token;
- if (z_liveliness_declare_token(
- session->loan(), &token, z_loan(liveliness_ke),
- NULL) != Z_OK)
- {
+ auto token = session->liveliness_declare_token(
+ zenoh::KeyExpr(liveliness_keyexpr),
+ zenoh::Session::LivelinessDeclarationOptions::create_default(),
+ &result);
+ if (result != Z_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to create liveliness token for the publisher.");
return nullptr;
}
- auto free_token = rcpputils::make_scope_exit(
- [&token]() {
- z_drop(z_move(token));
- });
-
- undeclare_z_publisher_cache.cancel();
- undeclare_z_publisher.cancel();
- free_token.cancel();
return std::shared_ptr(
new PublisherData{
@@ -207,10 +172,10 @@ std::shared_ptr PublisherData::make(
PublisherData::PublisherData(
const rmw_node_t * rmw_node,
std::shared_ptr entity,
- std::shared_ptr sess,
- z_owned_publisher_t pub,
- std::optional pub_cache,
- z_owned_liveliness_token_t token,
+ std::shared_ptr sess,
+ zenoh::Publisher pub,
+ std::optional pub_cache,
+ zenoh::LivelinessToken token,
const void * type_support_impl,
std::unique_ptr type_support)
: rmw_node_(rmw_node),
@@ -230,7 +195,7 @@ PublisherData::PublisherData(
///=============================================================================
rmw_ret_t PublisherData::publish(
const void * ros_message,
- std::optional & shm_provider)
+ std::optional & /*shm_provider*/)
{
std::lock_guard lock(mutex_);
if (is_shutdown_) {
@@ -245,47 +210,20 @@ rmw_ret_t PublisherData::publish(
// To store serialized message byte array.
char * msg_bytes = nullptr;
- std::optional shmbuf = std::nullopt;
- auto always_free_shmbuf = rcpputils::make_scope_exit(
- [&shmbuf]() {
- if (shmbuf.has_value()) {
- z_drop(z_move(shmbuf.value()));
- }
- });
rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator;
auto always_free_msg_bytes = rcpputils::make_scope_exit(
- [&msg_bytes, allocator, &shmbuf]() {
- if (msg_bytes && !shmbuf.has_value()) {
+ [&msg_bytes, allocator]() {
+ if (msg_bytes) {
allocator->deallocate(msg_bytes, allocator->state);
}
});
- // Get memory from SHM buffer if available.
- if (shm_provider.has_value()) {
- RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled.");
-
- auto provider = shm_provider.value();
- z_buf_layout_alloc_result_t alloc;
- // TODO(yuyuan): SHM, configure this
- z_alloc_alignment_t alignment = {5};
- z_shm_provider_alloc_gc_defrag_blocking(&alloc, z_loan(provider), SHM_BUF_OK_SIZE, alignment);
-
- if (alloc.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) {
- shmbuf = std::make_optional(alloc.buf);
- msg_bytes = reinterpret_cast(z_shm_mut_data_mut(z_loan_mut(alloc.buf)));
- } else {
- // TODO(Yadunund): Should we revert to regular allocation and not return an error?
- RMW_SET_ERROR_MSG("Failed to allocate a SHM buffer, even after GCing.");
- return RMW_RET_ERROR;
- }
- } else {
- // Get memory from the allocator.
- msg_bytes = static_cast(allocator->allocate(max_data_length, allocator->state));
- RMW_CHECK_FOR_NULL_WITH_MSG(
- msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC);
- }
+ // Get memory from the allocator.
+ msg_bytes = static_cast(allocator->allocate(max_data_length, allocator->state));
+ RMW_CHECK_FOR_NULL_WITH_MSG(
+ msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC);
// Object that manages the raw buffer
eprosima::fastcdr::FastBuffer fastbuffer(msg_bytes, max_data_length);
@@ -306,24 +244,21 @@ rmw_ret_t PublisherData::publish(
// The encoding is simply forwarded and is useful when key expressions in the
// session use different encoding formats. In our case, all key expressions
// will be encoded with CDR so it does not really matter.
- z_publisher_put_options_t options;
- z_publisher_put_options_default(&options);
- z_owned_bytes_t attachment;
- uint8_t local_gid[RMW_GID_STORAGE_SIZE];
- entity_->copy_gid(local_gid);
- create_map_and_set_sequence_num(&attachment, sequence_number_++, local_gid);
- options.attachment = z_move(attachment);
-
- z_owned_bytes_t payload;
- if (shmbuf.has_value()) {
- z_bytes_from_shm_mut(&payload, z_move(shmbuf.value()));
- } else {
- z_bytes_copy_from_buf(&payload, reinterpret_cast(msg_bytes), data_length);
- }
-
- z_result_t res = z_publisher_put(z_loan(pub_), z_move(payload), &options);
- if (res != Z_OK) {
- if (res == Z_ESESSION_CLOSED) {
+ zenoh::ZResult result;
+ auto options = zenoh::Publisher::PutOptions::create_default();
+ options.attachment = create_map_and_set_sequence_num(
+ sequence_number_++,
+ entity_->copy_gid());
+
+ // TODO(ahcorde): shmbuf
+ std::vector raw_data(
+ reinterpret_cast(msg_bytes),
+ reinterpret_cast(msg_bytes) + data_length);
+ zenoh::Bytes payload(std::move(raw_data));
+
+ pub_.put(std::move(payload), std::move(options), &result);
+ if (result != Z_OK) {
+ if (result == Z_ESESSION_CLOSED) {
RMW_ZENOH_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"unable to publish message since the zenoh session is closed");
@@ -339,7 +274,7 @@ rmw_ret_t PublisherData::publish(
///=============================================================================
rmw_ret_t PublisherData::publish_serialized_message(
const rmw_serialized_message_t * serialized_message,
- std::optional & /*shm_provider*/)
+ std::optional & /*shm_provider*/)
{
eprosima::fastcdr::FastBuffer buffer(
reinterpret_cast(serialized_message->buffer), serialized_message->buffer_length);
@@ -355,19 +290,18 @@ rmw_ret_t PublisherData::publish_serialized_message(
// The encoding is simply forwarded and is useful when key expressions in the
// session use different encoding formats. In our case, all key expressions
// will be encoded with CDR so it does not really matter.
- z_publisher_put_options_t options;
- z_publisher_put_options_default(&options);
- uint8_t local_gid[RMW_GID_STORAGE_SIZE];
- entity_->copy_gid(local_gid);
- z_owned_bytes_t attachment;
- create_map_and_set_sequence_num(&attachment, sequence_number_++, local_gid);
- options.attachment = z_move(attachment);
- z_owned_bytes_t payload;
- z_bytes_copy_from_buf(&payload, serialized_message->buffer, data_length);
-
- z_result_t res = z_publisher_put(z_loan(pub_), z_move(payload), &options);
- if (res != Z_OK) {
- if (res == Z_ESESSION_CLOSED) {
+ zenoh::ZResult result;
+ auto options = zenoh::Publisher::PutOptions::create_default();
+ options.attachment = create_map_and_set_sequence_num(sequence_number_++, entity_->copy_gid());
+
+ std::vector raw_data(
+ serialized_message->buffer,
+ serialized_message->buffer + data_length);
+ zenoh::Bytes payload(std::move(raw_data));
+
+ pub_.put(std::move(payload), std::move(options), &result);
+ if (result != Z_OK) {
+ if (result == Z_ESESSION_CLOSED) {
RMW_ZENOH_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"unable to publish message since the zenoh session is closed");
@@ -376,7 +310,6 @@ rmw_ret_t PublisherData::publish_serialized_message(
return RMW_RET_ERROR;
}
}
-
return RMW_RET_OK;
}
@@ -394,11 +327,10 @@ liveliness::TopicInfo PublisherData::topic_info() const
return entity_->topic_info().value();
}
-///=============================================================================
-void PublisherData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const
+std::array PublisherData::copy_gid() const
{
std::lock_guard lock(mutex_);
- entity_->copy_gid(out_gid);
+ return entity_->copy_gid();
}
///=============================================================================
@@ -440,11 +372,21 @@ rmw_ret_t PublisherData::shutdown()
}
// Unregister this publisher from the ROS graph.
- z_liveliness_undeclare_token(z_move(token_));
- if (pub_cache_.has_value()) {
- ze_undeclare_publication_cache(z_move(pub_cache_.value()));
+ zenoh::ZResult result;
+ std::move(token_).value().undeclare(&result);
+ if (result != Z_OK) {
+ RMW_ZENOH_LOG_ERROR_NAMED(
+ "rmw_zenoh_cpp",
+ "Unable to undeclare liveliness token");
+ return RMW_RET_ERROR;
+ }
+ std::move(pub_).undeclare(&result);
+ if (result != Z_OK) {
+ RMW_ZENOH_LOG_ERROR_NAMED(
+ "rmw_zenoh_cpp",
+ "Unable to undeclare publisher");
+ return RMW_RET_ERROR;
}
- z_undeclare_publisher(z_move(pub_));
sess_.reset();
is_shutdown_ = true;
diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp
index 1853ff8a..52c22523 100644
--- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp
+++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp
@@ -15,8 +15,7 @@
#ifndef DETAIL__RMW_PUBLISHER_DATA_HPP_
#define DETAIL__RMW_PUBLISHER_DATA_HPP_
-#include
-
+#include
#include
#include
#include
@@ -24,6 +23,8 @@
#include