Skip to content

Commit

Permalink
Added feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Alejandro Hernández Cordero <[email protected]>
  • Loading branch information
ahcorde committed Dec 11, 2024
1 parent d4b9613 commit 8cd19bf
Show file tree
Hide file tree
Showing 16 changed files with 136 additions and 150 deletions.
18 changes: 9 additions & 9 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ namespace rmw_zenoh_cpp
{

AttachmentData::AttachmentData(
const int64_t _sequence_number,
const int64_t _source_timestamp,
const std::vector<uint8_t> _source_gid)
const int64_t sequence_number,
const int64_t source_timestamp,
const std::vector<uint8_t> source_gid)
: sequence_number_(sequence_number),
source_timestamp_(source_timestamp),
source_gid_(source_gid),
gid_hash_(hash_gid(source_gid))
{
sequence_number_ = _sequence_number;
source_timestamp_ = _source_timestamp;
source_gid_ = _source_gid;
gid_hash_ = hash_gid(source_gid_);
}

AttachmentData::AttachmentData(AttachmentData && data)
Expand Down Expand Up @@ -85,9 +85,9 @@ zenoh::Bytes AttachmentData::serialize_to_zbytes()
return std::move(serializer).finish();
}

AttachmentData::AttachmentData(const zenoh::Bytes & attachment)
AttachmentData::AttachmentData(const zenoh::Bytes & bytes)
{
zenoh::ext::Deserializer deserializer(std::move(attachment));
zenoh::ext::Deserializer deserializer(std::move(bytes));
const auto sequence_number_str = deserializer.deserialize<std::string>();
if (sequence_number_str != "sequence_number") {
throw std::runtime_error("sequence_number is not found in the attachment.");
Expand Down
9 changes: 5 additions & 4 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@

namespace rmw_zenoh_cpp
{
///=============================================================================
class AttachmentData final
{
public:
explicit AttachmentData(
const int64_t _sequence_number,
const int64_t _source_timestamp,
const std::vector<uint8_t> _source_gid);
AttachmentData(
const int64_t sequence_number,
const int64_t source_timestamp,
const std::vector<uint8_t> source_gid);

explicit AttachmentData(const zenoh::Bytes & bytes);
explicit AttachmentData(AttachmentData && data);
Expand Down
4 changes: 2 additions & 2 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ TopicData::TopicData(ConstEntityPtr entity)
}

///=============================================================================
GraphCache::GraphCache(const std::string & zid)
: zid_str_(zid)
GraphCache::GraphCache(const zenoh::Id & zid)
: zid_str_(zid.to_string())
{
// Do nothing.
}
Expand Down
3 changes: 2 additions & 1 deletion rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "rmw/get_topic_endpoint_info.h"
#include "rmw/names_and_types.h"

#include <zenoh/api/id.hxx>

namespace rmw_zenoh_cpp
{
Expand Down Expand Up @@ -110,7 +111,7 @@ class GraphCache final
/// @param id The id of the zenoh session that is building the graph cache.
/// This is used to infer which entities originated from the current session
/// so that appropriate event callbacks may be triggered.
explicit GraphCache(const std::string & zid);
explicit GraphCache(const zenoh::Id & zid);

// Parse a PUT message over a token's key-expression and update the graph.
void parse_put(const std::string & keyexpr, bool ignore_from_current_session = false);
Expand Down
34 changes: 18 additions & 16 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,12 @@ bool ClientData::init(const std::shared_ptr<zenoh::Session> session)
keyexpr_ = zenoh::KeyExpr(topic_keyexpr);
// TODO(ahcorde) check KeyExpr
std::string liveliness_keyexpr = this->entity_->liveliness_keyexpr();
zenoh::ZResult err;
zenoh::ZResult result;
this->token_ = session->liveliness_declare_token(
zenoh::KeyExpr(liveliness_keyexpr),
zenoh::Session::LivelinessDeclarationOptions::create_default(),
&err);
if (err != Z_OK) {
&result);
if (result != Z_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to create liveliness token for the client.");
Expand Down Expand Up @@ -280,14 +280,10 @@ rmw_ret_t ClientData::take_response(
const zenoh::Sample & sample = reply.get_ok();

// Object that manages the raw buffer
auto & payload = sample.get_payload();
auto slice = payload.slice_iter().next();
if (slice.has_value()) {
const uint8_t * payload = slice.value().data;
const size_t payload_len = slice.value().len;

auto payload = sample.get_payload().as_vector();
if (payload.size() > 0) {
eprosima::fastcdr::FastBuffer fastbuffer(
reinterpret_cast<char *>(const_cast<uint8_t *>(payload)), payload_len);
reinterpret_cast<char *>(const_cast<uint8_t *>(payload.data())), payload.size());

// Object that serializes the data
rmw_zenoh_cpp::Cdr deser(fastbuffer);
Expand Down Expand Up @@ -401,10 +397,10 @@ rmw_ret_t ClientData::send_request(
std::vector<uint8_t> raw_bytes(
reinterpret_cast<const uint8_t *>(request_bytes),
reinterpret_cast<const uint8_t *>(request_bytes) + data_length);
opts.payload = zenoh::Bytes(raw_bytes);
opts.payload = zenoh::Bytes(std::move(raw_bytes));

std::weak_ptr<rmw_zenoh_cpp::ClientData> client_data = shared_from_this();
zenoh::ZResult err;
zenoh::ZResult result;
std::string parameters;
context_impl->session()->get(
keyexpr_.value(),
Expand Down Expand Up @@ -448,7 +444,13 @@ rmw_ret_t ClientData::send_request(
}
},
std::move(opts),
&err);
&result);
if (result != Z_OK) {
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"ClientData unable to call get");
return RMW_RET_ERROR;
}
return RMW_RET_OK;
}

Expand Down Expand Up @@ -506,9 +508,9 @@ rmw_ret_t ClientData::shutdown()

// Unregister this node from the ROS graph.
if (initialized_) {
zenoh::ZResult err;
std::move(token_).value().undeclare(&err);
if (err != Z_OK) {
zenoh::ZResult result;
std::move(token_).value().undeclare(&result);
if (result != Z_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to undeclare liveliness token");
Expand Down
36 changes: 11 additions & 25 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ class rmw_context_impl_s::Data final
"Unable to connect to a Zenoh router. "
"Have you started a router with `ros2 run rmw_zenoh_cpp rmw_zenohd`?");
}
zenoh::ZResult err;
this->session_->get_routers_z_id(&err);
if (err != Z_OK) {
zenoh::ZResult result;
this->session_->get_routers_z_id(&result);
if (result != Z_OK) {
++connection_attempts;
} else {
ret = RMW_RET_OK;
Expand All @@ -127,7 +127,7 @@ class rmw_context_impl_s::Data final

// Initialize the graph cache.
graph_cache_ =
std::make_shared<rmw_zenoh_cpp::GraphCache>(this->session_->get_zid().to_string());
std::make_shared<rmw_zenoh_cpp::GraphCache>(this->session_->get_zid());
// Setup liveliness subscriptions for discovery.
std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token(domain_id);

Expand Down Expand Up @@ -155,14 +155,13 @@ class rmw_context_impl_s::Data final
options.target = zenoh::QueryTarget::Z_QUERY_TARGET_ALL;
options.payload = "";

zenoh::ZResult err;
auto replies = session_->liveliness_get(
keyexpr,
zenoh::channels::FifoChannel(SIZE_MAX - 1),
zenoh::Session::LivelinessGetOptions::create_default(),
&err);
&result);

if (err != Z_OK) {
if (result != Z_OK) {
throw std::runtime_error("Error getting liveliness. ");
}

Expand Down Expand Up @@ -223,9 +222,9 @@ class rmw_context_impl_s::Data final
},
zenoh::closures::none,
std::move(sub_options),
&err);
&result);

if (err != Z_OK) {
if (result != Z_OK) {
RMW_SET_ERROR_MSG("unable to create zenoh subscription");
throw std::runtime_error("Unable to subscribe to ROS graph updates.");
}
Expand All @@ -241,22 +240,9 @@ class rmw_context_impl_s::Data final
return ret;
}

// TODO(ahcorde): review this
// // Shutdown all the nodes in this context.
// for (auto node_it = nodes_.begin(); node_it != nodes_.end(); ++node_it) {
// ret = node_it->second->shutdown();
// if (ret != RMW_RET_OK) {
// RMW_ZENOH_LOG_ERROR_NAMED(
// "rmw_zenoh_cpp",
// "Unable to shutdown node with id %zu. rmw_ret_t code: %zu.",
// node_it->second->id(),
// ret
// );
// }

zenoh::ZResult err;
std::move(graph_subscriber_cpp_).value().undeclare(&err);
if (err != Z_OK) {
zenoh::ZResult result;
std::move(graph_subscriber_cpp_).value().undeclare(&result);
if (result != Z_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to undeclare liveliness token");
Expand Down
12 changes: 6 additions & 6 deletions rmw_zenoh_cpp/src/detail/rmw_node_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ std::shared_ptr<NodeData> NodeData::make(

// Create the liveliness token.
std::string liveliness_keyexpr = entity->liveliness_keyexpr();
zenoh::ZResult err;
zenoh::ZResult result;
auto token = session->liveliness_declare_token(
zenoh::KeyExpr(liveliness_keyexpr),
zenoh::Session::LivelinessDeclarationOptions::create_default(),
&err);
if (err != Z_OK) {
&result);
if (result != Z_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to create liveliness token for the node.");
Expand Down Expand Up @@ -393,9 +393,9 @@ rmw_ret_t NodeData::shutdown()
}

// Unregister this node from the ROS graph.
zenoh::ZResult err;
std::move(token_).value().undeclare(&err);
if (err != Z_OK) {
zenoh::ZResult result;
std::move(token_).value().undeclare(&result);
if (result != Z_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to undeclare liveliness token");
Expand Down
40 changes: 20 additions & 20 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ std::shared_ptr<PublisherData> PublisherData::make(
return nullptr;
}

zenoh::ZResult err;
zenoh::ZResult result;
std::optional<zenoh::ext::PublicationCache> pub_cache;
zenoh::KeyExpr pub_ke(entity->topic_info()->topic_keyexpr_);
// Create a Publication Cache if durability is transient_local.
Expand All @@ -117,9 +117,9 @@ std::shared_ptr<PublisherData> PublisherData::make(
std::string queryable_prefix = entity->zid();
pub_cache_opts.queryable_prefix = zenoh::KeyExpr(queryable_prefix);

pub_cache = session->declare_publication_cache(pub_ke, std::move(pub_cache_opts), &err);
pub_cache = session->declare_publication_cache(pub_ke, std::move(pub_cache_opts), &result);

if (err != Z_OK) {
if (result != Z_OK) {
RMW_SET_ERROR_MSG("unable to create zenoh publisher cache");
return nullptr;
}
Expand All @@ -135,9 +135,9 @@ std::shared_ptr<PublisherData> PublisherData::make(
opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK;
}
}
auto pub = session->declare_publisher(pub_ke, std::move(opts), &err);
auto pub = session->declare_publisher(pub_ke, std::move(opts), &result);

if (err != Z_OK) {
if (result != Z_OK) {
RMW_SET_ERROR_MSG("Unable to create Zenoh publisher.");
return nullptr;
}
Expand All @@ -146,8 +146,8 @@ std::shared_ptr<PublisherData> PublisherData::make(
auto token = session->liveliness_declare_token(
zenoh::KeyExpr(liveliness_keyexpr),
zenoh::Session::LivelinessDeclarationOptions::create_default(),
&err);
if (err != Z_OK) {
&result);
if (result != Z_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to create liveliness token for the publisher.");
Expand Down Expand Up @@ -273,7 +273,7 @@ rmw_ret_t PublisherData::publish(
// The encoding is simply forwarded and is useful when key expressions in the
// session use different encoding formats. In our case, all key expressions
// will be encoded with CDR so it does not really matter.
zenoh::ZResult err;
zenoh::ZResult result;
auto options = zenoh::Publisher::PutOptions::create_default();
options.attachment = create_map_and_set_sequence_num(
sequence_number_++,
Expand All @@ -285,9 +285,9 @@ rmw_ret_t PublisherData::publish(
reinterpret_cast<const uint8_t *>(msg_bytes) + data_length);
zenoh::Bytes payload(raw_image);

pub_.put(std::move(payload), std::move(options), &err);
if (err != Z_OK) {
if (err == Z_ESESSION_CLOSED) {
pub_.put(std::move(payload), std::move(options), &result);
if (result != Z_OK) {
if (result == Z_ESESSION_CLOSED) {
RMW_ZENOH_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"unable to publish message since the zenoh session is closed");
Expand Down Expand Up @@ -319,7 +319,7 @@ rmw_ret_t PublisherData::publish_serialized_message(
// The encoding is simply forwarded and is useful when key expressions in the
// session use different encoding formats. In our case, all key expressions
// will be encoded with CDR so it does not really matter.
zenoh::ZResult err;
zenoh::ZResult result;
auto options = zenoh::Publisher::PutOptions::create_default();
options.attachment = create_map_and_set_sequence_num(sequence_number_++, entity_->copy_gid());

Expand All @@ -328,9 +328,9 @@ rmw_ret_t PublisherData::publish_serialized_message(
serialized_message->buffer + data_length);
zenoh::Bytes payload(raw_image);

pub_.put(std::move(payload), std::move(options), &err);
if (err != Z_OK) {
if (err == Z_ESESSION_CLOSED) {
pub_.put(std::move(payload), std::move(options), &result);
if (result != Z_OK) {
if (result == Z_ESESSION_CLOSED) {
RMW_ZENOH_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"unable to publish message since the zenoh session is closed");
Expand Down Expand Up @@ -408,16 +408,16 @@ rmw_ret_t PublisherData::shutdown()
}

// Unregister this publisher from the ROS graph.
zenoh::ZResult err;
std::move(token_).value().undeclare(&err);
if (err != Z_OK) {
zenoh::ZResult result;
std::move(token_).value().undeclare(&result);
if (result != Z_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to undeclare liveliness token");
return RMW_RET_ERROR;
}
std::move(pub_).undeclare(&err);
if (err != Z_OK) {
std::move(pub_).undeclare(&result);
if (result != Z_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to undeclare publisher");
Expand Down
2 changes: 2 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class PublisherData final

// Copy the GID of this PublisherData into an rmw_gid_t.
void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const;

// Return a copy of the GID of this publisher.
std::vector<uint8_t> copy_gid() const;

// Returns true if liveliness token is still valid.
Expand Down
Loading

0 comments on commit 8cd19bf

Please sign in to comment.