Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates to the C++ code #344

Merged
merged 5 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <array>
#include <cstdlib>
#include <cstring>
#include <stdexcept>
#include <string>
#include <string_view>
#include <utility>
#include <vector>

#include <zenoh.hxx>

Expand All @@ -33,7 +32,7 @@ namespace rmw_zenoh_cpp
AttachmentData::AttachmentData(
const int64_t sequence_number,
const int64_t source_timestamp,
const std::vector<uint8_t> source_gid)
const std::array<uint8_t, RMW_GID_STORAGE_SIZE> source_gid)
: sequence_number_(sequence_number),
source_timestamp_(source_timestamp),
source_gid_(source_gid),
Expand Down Expand Up @@ -62,7 +61,7 @@ int64_t AttachmentData::source_timestamp() const
}

///=============================================================================
std::vector<uint8_t> AttachmentData::copy_gid() const
std::array<uint8_t, RMW_GID_STORAGE_SIZE> AttachmentData::copy_gid() const
{
return source_gid_;
}
Expand Down Expand Up @@ -104,7 +103,7 @@ AttachmentData::AttachmentData(const zenoh::Bytes & bytes)
if (source_gid_str != "source_gid") {
throw std::runtime_error("source_gid is not found in the attachment.");
}
this->source_gid_ = deserializer.deserialize<std::vector<uint8_t>>();
this->source_gid_ = deserializer.deserialize<std::array<uint8_t, RMW_GID_STORAGE_SIZE>>();
gid_hash_ = hash_gid(this->source_gid_);
}
} // namespace rmw_zenoh_cpp
8 changes: 4 additions & 4 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
#ifndef DETAIL__ATTACHMENT_HELPERS_HPP_
#define DETAIL__ATTACHMENT_HELPERS_HPP_

#include <array>
#include <cstdint>
#include <vector>

#include <zenoh.hxx>

Expand All @@ -31,22 +31,22 @@ class AttachmentData final
AttachmentData(
const int64_t sequence_number,
const int64_t source_timestamp,
const std::vector<uint8_t> source_gid);
const std::array<uint8_t, RMW_GID_STORAGE_SIZE> source_gid);

explicit AttachmentData(const zenoh::Bytes & bytes);
explicit AttachmentData(AttachmentData && data);

int64_t sequence_number() const;
int64_t source_timestamp() const;
std::vector<uint8_t> copy_gid() const;
std::array<uint8_t, RMW_GID_STORAGE_SIZE> copy_gid() const;
size_t gid_hash() const;

zenoh::Bytes serialize_to_zbytes();

private:
int64_t sequence_number_;
int64_t source_timestamp_;
std::vector<uint8_t> source_gid_;
std::array<uint8_t, RMW_GID_STORAGE_SIZE> source_gid_;
size_t gid_hash_;
};
} // namespace rmw_zenoh_cpp
Expand Down
1 change: 0 additions & 1 deletion rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,6 @@ rmw_ret_t GraphCache::get_entities_info_by_topic(
}
}

memset(ep.endpoint_gid, 0, RMW_GID_STORAGE_SIZE);
memcpy(ep.endpoint_gid, entity->copy_gid().data(), RMW_GID_STORAGE_SIZE);

endpoints.push_back(ep);
Expand Down
5 changes: 2 additions & 3 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,6 @@ Entity::Entity(
// returned to the RMW layer as necessary.
simplified_XXH128_hash_t keyexpr_gid =
simplified_XXH3_128bits(this->liveliness_keyexpr_.c_str(), this->liveliness_keyexpr_.length());
this->gid_.resize(RMW_GID_STORAGE_SIZE);
memcpy(this->gid_.data(), &keyexpr_gid.low64, sizeof(keyexpr_gid.low64));
memcpy(this->gid_.data() + sizeof(keyexpr_gid.low64), &keyexpr_gid.high64,
sizeof(keyexpr_gid.high64));
Expand Down Expand Up @@ -632,7 +631,7 @@ std::string Entity::liveliness_keyexpr() const
}

///=============================================================================
std::vector<uint8_t> Entity::copy_gid() const
std::array<uint8_t, RMW_GID_STORAGE_SIZE> Entity::copy_gid() const
{
return gid_;
}
Expand Down Expand Up @@ -673,7 +672,7 @@ std::string demangle_name(const std::string & input)
} // namespace liveliness

///=============================================================================
size_t hash_gid(const std::vector<uint8_t> gid)
size_t hash_gid(const std::array<uint8_t, RMW_GID_STORAGE_SIZE> gid)
{
std::stringstream hash_str;
hash_str << std::hex;
Expand Down
7 changes: 4 additions & 3 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef DETAIL__LIVELINESS_UTILS_HPP_
#define DETAIL__LIVELINESS_UTILS_HPP_

#include <array>
#include <cstddef>
#include <cstdint>
#include <functional>
Expand Down Expand Up @@ -172,7 +173,7 @@ class Entity
// Two entities are equal if their keyexpr_hash are equal.
bool operator==(const Entity & other) const;

std::vector<uint8_t> copy_gid() const;
std::array<uint8_t, RMW_GID_STORAGE_SIZE> copy_gid() const;

private:
Entity(
Expand All @@ -191,7 +192,7 @@ class Entity
NodeInfo node_info_;
std::optional<TopicInfo> topic_info_;
std::string liveliness_keyexpr_;
std::vector<uint8_t> gid_;
std::array<uint8_t, RMW_GID_STORAGE_SIZE> gid_{};
};

///=============================================================================
Expand Down Expand Up @@ -234,7 +235,7 @@ std::optional<rmw_qos_profile_t> keyexpr_to_qos(const std::string & keyexpr);
} // namespace liveliness

///=============================================================================
size_t hash_gid(const std::vector<uint8_t> gid);
size_t hash_gid(const std::array<uint8_t, RMW_GID_STORAGE_SIZE> gid);
} // namespace rmw_zenoh_cpp

///=============================================================================
Expand Down
111 changes: 49 additions & 62 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <fastcdr/FastBuffer.h>

#include <array>
#include <chrono>
#include <cinttypes>
#include <limits>
Expand Down Expand Up @@ -141,11 +142,6 @@ std::shared_ptr<ClientData> ClientData::make(
response_type_support
});

if (!client_data->init()) {
// init() already set the error.
return nullptr;
}

return client_data;
}

Expand All @@ -171,12 +167,6 @@ ClientData::ClientData(
sequence_number_(1),
is_shutdown_(false),
initialized_(false)
{
// Do nothing.
}

///=============================================================================
bool ClientData::init()
{
std::string topic_keyexpr = this->entity_->topic_info().value().topic_keyexpr_;
keyexpr_ = zenoh::KeyExpr(topic_keyexpr);
Expand All @@ -190,12 +180,10 @@ bool ClientData::init()
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to create liveliness token for the client.");
return false;
throw std::runtime_error("Unable to create liveliness token for the client.");
}

initialized_ = true;

return true;
}

///=============================================================================
Expand All @@ -216,7 +204,7 @@ bool ClientData::liveliness_is_valid() const
}

///=============================================================================
std::vector<uint8_t> ClientData::copy_gid() const
std::array<uint8_t, RMW_GID_STORAGE_SIZE> ClientData::copy_gid() const
{
return entity_->copy_gid();
}
Expand Down Expand Up @@ -276,54 +264,55 @@ rmw_ret_t ClientData::take_response(
const zenoh::Sample & sample = reply.get_ok();

// Object that manages the raw buffer
auto payload = sample.get_payload().as_vector();
if (payload.size() > 0) {
eprosima::fastcdr::FastBuffer fastbuffer(
reinterpret_cast<char *>(const_cast<uint8_t *>(payload.data())), payload.size());

// Object that serializes the data
rmw_zenoh_cpp::Cdr deser(fastbuffer);
if (!response_type_support_->deserialize_ros_message(
deser.get_cdr(),
ros_response,
response_type_support_impl_))
{
RMW_SET_ERROR_MSG("could not deserialize ROS response");
return RMW_RET_ERROR;
}

// Fill in the request_header
if (!sample.get_attachment().has_value()) {
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"ClientData take_request attachment is empty");
return RMW_RET_ERROR;
}
rmw_zenoh_cpp::AttachmentData attachment(std::move(sample.get_attachment().value().get()));
request_header->request_id.sequence_number = attachment.sequence_number();
if (request_header->request_id.sequence_number < 0) {
RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment");
return RMW_RET_ERROR;
}
request_header->source_timestamp = attachment.source_timestamp();
if (request_header->source_timestamp < 0) {
RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment");
return RMW_RET_ERROR;
}
memcpy(
request_header->request_id.writer_guid,
attachment.copy_gid().data(),
RMW_GID_STORAGE_SIZE);
request_header->received_timestamp = latest_reply->get_received_timestamp();

*taken = true;
} else {
std::vector<uint8_t> payload = sample.get_payload().as_vector();
if (payload.size() == 0) {
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"ClientData not able to get slice data");
return RMW_RET_ERROR;
}

// Fill in the request_header
if (!sample.get_attachment().has_value()) {
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"ClientData take_request attachment is empty");
return RMW_RET_ERROR;
}

eprosima::fastcdr::FastBuffer fastbuffer(
reinterpret_cast<char *>(const_cast<uint8_t *>(payload.data())), payload.size());

// Object that serializes the data
rmw_zenoh_cpp::Cdr deser(fastbuffer);
if (!response_type_support_->deserialize_ros_message(
deser.get_cdr(),
ros_response,
response_type_support_impl_))
{
RMW_SET_ERROR_MSG("could not deserialize ROS response");
return RMW_RET_ERROR;
}

rmw_zenoh_cpp::AttachmentData attachment(std::move(sample.get_attachment().value().get()));
request_header->request_id.sequence_number = attachment.sequence_number();
if (request_header->request_id.sequence_number < 0) {
RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment");
return RMW_RET_ERROR;
}
request_header->source_timestamp = attachment.source_timestamp();
if (request_header->source_timestamp < 0) {
RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment");
return RMW_RET_ERROR;
}
memcpy(
request_header->request_id.writer_guid,
attachment.copy_gid().data(),
RMW_GID_STORAGE_SIZE);
request_header->received_timestamp = latest_reply->get_received_timestamp();

*taken = true;

return RMW_RET_OK;
}

Expand Down Expand Up @@ -376,8 +365,7 @@ rmw_ret_t ClientData::send_request(

// Send request
zenoh::Session::GetOptions opts = zenoh::Session::GetOptions::create_default();
std::vector<uint8_t> local_gid;
local_gid = entity_->copy_gid();
std::array<uint8_t, RMW_GID_STORAGE_SIZE> local_gid = entity_->copy_gid();
opts.attachment = rmw_zenoh_cpp::create_map_and_set_sequence_num(*sequence_id, local_gid);
opts.target = Z_QUERY_TARGET_ALL_COMPLETE;
// The default timeout for a z_get query is 10 seconds and if a response is not received within
Expand Down Expand Up @@ -425,11 +413,10 @@ rmw_ret_t ClientData::send_request(
return;
}

std::chrono::nanoseconds::rep received_timestamp =
std::chrono::system_clock::now().time_since_epoch().count();
std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();

sub_data->add_new_reply(
std::make_unique<rmw_zenoh_cpp::ZenohReply>(reply, received_timestamp));
std::make_unique<rmw_zenoh_cpp::ZenohReply>(reply, now.time_since_epoch().count()));
},
zenoh::closures::none,
std::move(opts),
Expand Down
7 changes: 2 additions & 5 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef DETAIL__RMW_CLIENT_DATA_HPP_
#define DETAIL__RMW_CLIENT_DATA_HPP_

#include <array>
#include <cstddef>
#include <cstdint>
#include <deque>
Expand All @@ -23,7 +24,6 @@
#include <optional>
#include <string>
#include <unordered_map>
#include <vector>

#include <zenoh.hxx>

Expand Down Expand Up @@ -65,7 +65,7 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
bool liveliness_is_valid() const;

// Copy the GID of this ClientData into an rmw_gid_t.
std::vector<uint8_t> copy_gid() const;
std::array<uint8_t, RMW_GID_STORAGE_SIZE> copy_gid() const;

// Add a new ZenohReply to the queue.
void add_new_reply(std::unique_ptr<rmw_zenoh_cpp::ZenohReply> reply);
Expand Down Expand Up @@ -114,9 +114,6 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
std::shared_ptr<RequestTypeSupport> request_type_support,
std::shared_ptr<ResponseTypeSupport> response_type_support);

// Initialize the Zenoh objects for this entity.
bool init();

// Internal mutex.
mutable std::recursive_mutex mutex_;
// The parent node.
Expand Down
Loading
Loading