Skip to content

Commit

Permalink
Feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Alejandro Hernández Cordero <[email protected]>
  • Loading branch information
ahcorde committed Dec 12, 2024
1 parent bbb6d49 commit 0814156
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 17 deletions.
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/src/detail/attachment_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ zenoh::Bytes AttachmentData::serialize_to_zbytes()

AttachmentData::AttachmentData(const zenoh::Bytes & bytes)
{
zenoh::ext::Deserializer deserializer(std::move(bytes));
zenoh::ext::Deserializer deserializer(bytes);
const auto sequence_number_str = deserializer.deserialize<std::string>();
if (sequence_number_str != "sequence_number") {
throw std::runtime_error("sequence_number is not found in the attachment.");
Expand Down
8 changes: 3 additions & 5 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ std::shared_ptr<ClientData> ClientData::make(
response_type_support
});

if (!client_data->init(session)) {
if (!client_data->init()) {
// init() already set the error.
return nullptr;
}
Expand Down Expand Up @@ -176,14 +176,13 @@ ClientData::ClientData(
}

///=============================================================================
bool ClientData::init(const std::shared_ptr<zenoh::Session> session)
bool ClientData::init()
{
std::string topic_keyexpr = this->entity_->topic_info().value().topic_keyexpr_;
keyexpr_ = zenoh::KeyExpr(topic_keyexpr);
// TODO(ahcorde) check KeyExpr
std::string liveliness_keyexpr = this->entity_->liveliness_keyexpr();
zenoh::ZResult result;
this->token_ = session->liveliness_declare_token(
this->token_ = sess_->liveliness_declare_token(
zenoh::KeyExpr(liveliness_keyexpr),
zenoh::Session::LivelinessDeclarationOptions::create_default(),
&result);
Expand All @@ -194,7 +193,6 @@ bool ClientData::init(const std::shared_ptr<zenoh::Session> session)
return false;
}

sess_ = session;
initialized_ = true;

return true;
Expand Down
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/src/detail/rmw_client_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
std::shared_ptr<ResponseTypeSupport> response_type_support);

// Initialize the Zenoh objects for this entity.
bool init(const std::shared_ptr<zenoh::Session> session);
bool init();

// Internal mutex.
mutable std::recursive_mutex mutex_;
Expand Down
15 changes: 12 additions & 3 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class rmw_context_impl_s::Data final
zenoh::Session::LivelinessSubscriberOptions sub_options =
zenoh::Session::LivelinessSubscriberOptions::create_default();
sub_options.history = true;
graph_subscriber_cpp_ = session_->liveliness_declare_subscriber(
graph_subscriber_ = session_->liveliness_declare_subscriber(
keyexpr_cpp,
[&](const zenoh::Sample & sample) {
// Look up the data shared_ptr in the global map. If it is in there, use it.
Expand Down Expand Up @@ -244,7 +244,7 @@ class rmw_context_impl_s::Data final
}

zenoh::ZResult result;
std::move(graph_subscriber_cpp_).value().undeclare(&result);
std::move(graph_subscriber_).value().undeclare(&result);
if (result != Z_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
Expand Down Expand Up @@ -418,7 +418,16 @@ class rmw_context_impl_s::Data final
// Graph cache.
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache_;
// ROS graph liveliness subscriber.
std::optional<zenoh::Subscriber<void>> graph_subscriber_cpp_;
// The graph_subscriber *must* exist in order for anything in this Data class,
// and hence rmw_zenoh_cpp, to work.
// However, zenoh::Subscriber does not have an empty constructor,
// so just declaring this as a zenoh::Subscriber fails to compile.
// We work around that by wrapping it in a std::optional,
// so the std::optional gets constructed at Data constructor time,
// and then we initialize graph_subscriber_ later. Note that the zenoh-cpp API
// liveliness_declare_subscriber() throws an exception if it fails,
// so this should all be safe to do.
std::optional<zenoh::Subscriber<void>> graph_subscriber_;
// Equivalent to rmw_dds_common::Context's guard condition.
// Guard condition that should be triggered when the graph changes.
std::unique_ptr<rmw_guard_condition_t> graph_guard_condition_;
Expand Down
9 changes: 5 additions & 4 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ rmw_ret_t PublisherData::publish(
}
});

// TODO(anyone): Move this zenoh_cpp API
// Get memory from SHM buffer if available.
// if (shm_provider.has_value()) {
// RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled.");
Expand Down Expand Up @@ -280,10 +281,10 @@ rmw_ret_t PublisherData::publish(
entity_->copy_gid());

// TODO(ahcorde): shmbuf
std::vector<uint8_t> raw_image(
std::vector<uint8_t> raw_data(
reinterpret_cast<const uint8_t *>(msg_bytes),
reinterpret_cast<const uint8_t *>(msg_bytes) + data_length);
zenoh::Bytes payload(raw_image);
zenoh::Bytes payload(std::move(raw_data));

pub_.put(std::move(payload), std::move(options), &result);
if (result != Z_OK) {
Expand Down Expand Up @@ -323,10 +324,10 @@ rmw_ret_t PublisherData::publish_serialized_message(
auto options = zenoh::Publisher::PutOptions::create_default();
options.attachment = create_map_and_set_sequence_num(sequence_number_++, entity_->copy_gid());

std::vector<uint8_t> raw_image(
std::vector<uint8_t> raw_data(
serialized_message->buffer,
serialized_message->buffer + data_length);
zenoh::Bytes payload(raw_image);
zenoh::Bytes payload(std::move(raw_data));

pub_.put(std::move(payload), std::move(options), &result);
if (result != Z_OK) {
Expand Down
12 changes: 9 additions & 3 deletions rmw_zenoh_cpp/src/detail/rmw_service_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,10 @@ rmw_ret_t ServiceData::take_request(
return RMW_RET_ERROR;
}

auto writter_gid_v = attachment.copy_gid();
memcpy(
request_header->request_id.writer_guid,
attachment.copy_gid().data(),
writter_gid_v.data(),
RMW_GID_STORAGE_SIZE);

request_header->source_timestamp = attachment.source_timestamp();
Expand All @@ -339,7 +340,7 @@ rmw_ret_t ServiceData::take_request(
request_header->received_timestamp = query->get_received_timestamp();

// Add this query to the map, so that rmw_send_response can quickly look it up later.
const size_t hash = rmw_zenoh_cpp::hash_gid_p(request_header->request_id.writer_guid);
const size_t hash = rmw_zenoh_cpp::hash_gid(writter_gid_v);
std::unordered_map<size_t, SequenceToQuery>::iterator it = sequence_to_query_map_.find(hash);
if (it == sequence_to_query_map_.end()) {
SequenceToQuery stq;
Expand Down Expand Up @@ -378,8 +379,13 @@ rmw_ret_t ServiceData::send_response(
);
return RMW_RET_OK;
}

std::vector<uint8_t> writer_guid;
writer_guid.resize(RMW_GID_STORAGE_SIZE);
memcpy(writer_guid.data(), request_id->writer_guid, RMW_GID_STORAGE_SIZE);

// Create the queryable payload
const size_t hash = hash_gid_p(request_id->writer_guid);
const size_t hash = hash_gid(writer_guid);
std::unordered_map<size_t, SequenceToQuery>::iterator it = sequence_to_query_map_.find(hash);
if (it == sequence_to_query_map_.end()) {
// If there is no data associated with this request, the higher layers of
Expand Down
16 changes: 16 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_service_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,24 @@ class ServiceData final : public std::enable_shared_from_this<ServiceData>
// The keyexpr string.
std::string keyexpr_;
// An owned queryable.
// The Queryable *must* exist in order for anything in this ServiceData class,
// and hence rmw_zenoh_cpp, to work.
// However, zenoh::Queryable does not have an empty constructor,
// so just declaring this as a zenoh::Queryable fails to compile.
// We work around that by wrapping it in a std::optional, so the std::optional
// gets constructed at ServiceData constructor time,
// and then we initialize qable_ later. Note that the zenoh-cpp API declare_queryable() throws an
// exception if it fails, so this should all be safe to do.
std::optional<zenoh::Queryable<void>> qable_;
// Liveliness token for the service.
// The token_ *must* exist in order for anything in this ServiceData class,
// and hence rmw_zenoh_cpp, to work.
// However, zenoh::LivelinessToken does not have an empty constructor,
// so just declaring this as a zenoh::LivelinessToken fails to compile.
// We work around that by wrapping it in a std::optional, so the std::optional
// gets constructed at ServiceData constructor time,
// and then we initialize token_ later. Note that the zenoh-cpp API
// liveliness_declare_token() throws an exception if it fails, so this should all be safe to do.
std::optional<zenoh::LivelinessToken> token_;
// Type support fields.
const void * request_type_support_impl_;
Expand Down

0 comments on commit 0814156

Please sign in to comment.