Skip to content

Commit

Permalink
Added feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Alejandro Hernández Cordero <[email protected]>
  • Loading branch information
ahcorde committed Dec 16, 2024
1 parent fbb49a0 commit 9e17da2
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 166 deletions.
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1202,7 +1202,7 @@ rmw_ret_t GraphCache::service_server_is_available(
service_it->second.find(client_topic_info.type_);
if (type_it != service_it->second.end()) {
for (const auto & [_, topic_data] : type_it->second) {
if (topic_data->subs_.size() > 0) {
if (!topic_data->subs_.empty()) {
*is_available = true;
return RMW_RET_OK;
}
Expand Down
17 changes: 0 additions & 17 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -631,11 +631,6 @@ std::string Entity::liveliness_keyexpr() const
return this->liveliness_keyexpr_;
}

void Entity::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const
{
memcpy(out_gid, gid_.data(), RMW_GID_STORAGE_SIZE);
}

///=============================================================================
std::vector<uint8_t> Entity::copy_gid() const
{
Expand Down Expand Up @@ -687,16 +682,4 @@ size_t hash_gid(const std::vector<uint8_t> gid)
}
return std::hash<std::string>{}(hash_str.str());
}

///=============================================================================
size_t hash_gid_p(const uint8_t gid[RMW_GID_STORAGE_SIZE])
{
std::stringstream hash_str;
hash_str << std::hex;
size_t i = 0;
for (; i < (RMW_GID_STORAGE_SIZE - 1); i++) {
hash_str << static_cast<int>(gid[i]);
}
return std::hash<std::string>{}(hash_str.str());
}
} // namespace rmw_zenoh_cpp
5 changes: 0 additions & 5 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,6 @@ class Entity
// Two entities are equal if their keyexpr_hash are equal.
bool operator==(const Entity & other) const;

void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const;

std::vector<uint8_t> copy_gid() const;

private:
Expand Down Expand Up @@ -236,9 +234,6 @@ std::optional<rmw_qos_profile_t> keyexpr_to_qos(const std::string & keyexpr);
} // namespace liveliness

///=============================================================================
/// Generate a hash for a given GID.
size_t hash_gid_p(const uint8_t gid[RMW_GID_STORAGE_SIZE]);

size_t hash_gid(const std::vector<uint8_t> gid);
} // namespace rmw_zenoh_cpp

Expand Down
10 changes: 1 addition & 9 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,15 +431,7 @@ rmw_ret_t ClientData::send_request(
sub_data->add_new_reply(
std::make_unique<rmw_zenoh_cpp::ZenohReply>(reply, received_timestamp));
},
[client_data]() {
auto sub_data = client_data.lock();
if (sub_data == nullptr) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to obtain ClientData");
return;
}
},
zenoh::closures::none,
std::move(opts),
&result);
if (result != Z_OK) {
Expand Down
7 changes: 0 additions & 7 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,13 +326,6 @@ liveliness::TopicInfo PublisherData::topic_info() const
return entity_->topic_info().value();
}

///=============================================================================
void PublisherData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const
{
std::lock_guard<std::mutex> lock(mutex_);
entity_->copy_gid(out_gid);
}

std::vector<uint8_t> PublisherData::copy_gid() const
{
std::lock_guard<std::mutex> lock(mutex_);
Expand Down
3 changes: 0 additions & 3 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ class PublisherData final
// Get a copy of the TopicInfo of this PublisherData.
liveliness::TopicInfo topic_info() const;

// Copy the GID of this PublisherData into an rmw_gid_t.
void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const;

// Return a copy of the GID of this publisher.
std::vector<uint8_t> copy_gid() const;

Expand Down
123 changes: 61 additions & 62 deletions rmw_zenoh_cpp/src/detail/rmw_service_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,76 +292,75 @@ rmw_ret_t ServiceData::take_request(

auto payload_data = payload.value().get().as_vector();

if (payload_data.size() > 0) {
// Object that manages the raw buffer
eprosima::fastcdr::FastBuffer fastbuffer(
reinterpret_cast<char *>(const_cast<uint8_t *>(payload_data.data())), payload_data.size());

// Object that serializes the data
Cdr deser(fastbuffer);
if (!request_type_support_->deserialize_ros_message(
deser.get_cdr(),
ros_request,
request_type_support_impl_))
{
RMW_SET_ERROR_MSG("could not deserialize ROS message");
return RMW_RET_ERROR;
}
if (payload_data.empty()) {
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"ServiceData not able to get slice data");
return RMW_RET_ERROR;
}
// Object that manages the raw buffer
eprosima::fastcdr::FastBuffer fastbuffer(
reinterpret_cast<char *>(const_cast<uint8_t *>(payload_data.data())), payload_data.size());

// Fill in the request header.
// Get the sequence_number out of the attachment
if (!loaned_query.get_attachment().has_value()) {
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"ServiceData take_request attachment is empty");
return RMW_RET_ERROR;
}
// Object that serializes the data
Cdr deser(fastbuffer);
if (!request_type_support_->deserialize_ros_message(
deser.get_cdr(),
ros_request,
request_type_support_impl_))
{
RMW_SET_ERROR_MSG("could not deserialize ROS message");
return RMW_RET_ERROR;
}

rmw_zenoh_cpp::AttachmentData attachment(std::move(
loaned_query.get_attachment().value().get()));
// Fill in the request header.
// Get the sequence_number out of the attachment
if (!loaned_query.get_attachment().has_value()) {
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"ServiceData take_request attachment is empty");
return RMW_RET_ERROR;
}

request_header->request_id.sequence_number = attachment.sequence_number();
if (request_header->request_id.sequence_number < 0) {
RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment");
return RMW_RET_ERROR;
}
rmw_zenoh_cpp::AttachmentData attachment(std::move(
loaned_query.get_attachment().value().get()));

auto writter_gid_v = attachment.copy_gid();
memcpy(
request_header->request_id.writer_guid,
writter_gid_v.data(),
RMW_GID_STORAGE_SIZE);
request_header->request_id.sequence_number = attachment.sequence_number();
if (request_header->request_id.sequence_number < 0) {
RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment");
return RMW_RET_ERROR;
}

request_header->source_timestamp = attachment.source_timestamp();
if (request_header->source_timestamp < 0) {
RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment");
return RMW_RET_ERROR;
}
request_header->received_timestamp = query->get_received_timestamp();

// Add this query to the map, so that rmw_send_response can quickly look it up later.
const size_t hash = rmw_zenoh_cpp::hash_gid(writter_gid_v);
std::unordered_map<size_t, SequenceToQuery>::iterator it = sequence_to_query_map_.find(hash);
if (it == sequence_to_query_map_.end()) {
SequenceToQuery stq;
sequence_to_query_map_.insert(std::make_pair(hash, std::move(stq)));
it = sequence_to_query_map_.find(hash);
} else {
// Client already in the map
if (it->second.find(request_header->request_id.sequence_number) != it->second.end()) {
RMW_SET_ERROR_MSG("duplicate sequence number in the map");
return RMW_RET_ERROR;
}
}
auto writter_gid_v = attachment.copy_gid();
memcpy(
request_header->request_id.writer_guid,
writter_gid_v.data(),
RMW_GID_STORAGE_SIZE);

it->second.insert(std::make_pair(request_header->request_id.sequence_number, std::move(query)));
*taken = true;
} else {
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"ServiceData not able to get slice data");
request_header->source_timestamp = attachment.source_timestamp();
if (request_header->source_timestamp < 0) {
RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment");
return RMW_RET_ERROR;
}
request_header->received_timestamp = query->get_received_timestamp();

// Add this query to the map, so that rmw_send_response can quickly look it up later.
const size_t hash = rmw_zenoh_cpp::hash_gid(writter_gid_v);
std::unordered_map<size_t, SequenceToQuery>::iterator it = sequence_to_query_map_.find(hash);
if (it == sequence_to_query_map_.end()) {
SequenceToQuery stq;
sequence_to_query_map_.insert(std::make_pair(hash, std::move(stq)));
it = sequence_to_query_map_.find(hash);
} else {
// Client already in the map
if (it->second.find(request_header->request_id.sequence_number) != it->second.end()) {
RMW_SET_ERROR_MSG("duplicate sequence number in the map");
return RMW_RET_ERROR;
}
}

it->second.insert(std::make_pair(request_header->request_id.sequence_number, std::move(query)));
*taken = true;

return RMW_RET_OK;
}
Expand Down
122 changes: 60 additions & 62 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -493,43 +493,42 @@ rmw_ret_t SubscriptionData::take_one_message(

auto payload_data = msg_data->payload.as_vector();

if (payload_data.size() > 0) {
// Object that manages the raw buffer
eprosima::fastcdr::FastBuffer fastbuffer(
reinterpret_cast<char *>(const_cast<uint8_t *>(payload_data.data())),
payload_data.size());

// Object that serializes the data
rmw_zenoh_cpp::Cdr deser(fastbuffer);
if (!type_support_->deserialize_ros_message(
deser.get_cdr(),
ros_message,
type_support_impl_))
{
RMW_SET_ERROR_MSG("could not deserialize ROS message");
return RMW_RET_ERROR;
}

if (message_info != nullptr) {
message_info->source_timestamp = msg_data->attachment.source_timestamp();
message_info->received_timestamp = msg_data->recv_timestamp;
message_info->publication_sequence_number = msg_data->attachment.sequence_number();
// TODO(clalancette): fill in reception_sequence_number
message_info->reception_sequence_number = 0;
message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier;
memcpy(
message_info->publisher_gid.data,
msg_data->attachment.copy_gid().data(),
RMW_GID_STORAGE_SIZE);
message_info->from_intra_process = false;
}
*taken = true;
} else {
if (payload_data.empty()) {
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"SubscriptionData not able to get slice data");
return RMW_RET_ERROR;
}
// Object that manages the raw buffer
eprosima::fastcdr::FastBuffer fastbuffer(
reinterpret_cast<char *>(const_cast<uint8_t *>(payload_data.data())),
payload_data.size());

// Object that serializes the data
rmw_zenoh_cpp::Cdr deser(fastbuffer);
if (!type_support_->deserialize_ros_message(
deser.get_cdr(),
ros_message,
type_support_impl_))
{
RMW_SET_ERROR_MSG("could not deserialize ROS message");
return RMW_RET_ERROR;
}

if (message_info != nullptr) {
message_info->source_timestamp = msg_data->attachment.source_timestamp();
message_info->received_timestamp = msg_data->recv_timestamp;
message_info->publication_sequence_number = msg_data->attachment.sequence_number();
// TODO(clalancette): fill in reception_sequence_number
message_info->reception_sequence_number = 0;
message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier;
memcpy(
message_info->publisher_gid.data,
msg_data->attachment.copy_gid().data(),
RMW_GID_STORAGE_SIZE);
message_info->from_intra_process = false;
}
*taken = true;

return RMW_RET_OK;
}
Expand All @@ -552,41 +551,40 @@ rmw_ret_t SubscriptionData::take_serialized_message(

auto payload_data = msg_data->payload.as_vector();

if (payload_data.size() > 0) {
if (serialized_message->buffer_capacity < payload_data.size()) {
rmw_ret_t ret =
rmw_serialized_message_resize(serialized_message, payload_data.size());
if (ret != RMW_RET_OK) {
return ret; // Error message already set
}
}
serialized_message->buffer_length = payload_data.size();
memcpy(
serialized_message->buffer,
reinterpret_cast<char *>(const_cast<uint8_t *>(payload_data.data())),
payload_data.size());

*taken = true;

if (message_info != nullptr) {
message_info->source_timestamp = msg_data->attachment.source_timestamp();
message_info->received_timestamp = msg_data->recv_timestamp;
message_info->publication_sequence_number = msg_data->attachment.sequence_number();
// TODO(clalancette): fill in reception_sequence_number
message_info->reception_sequence_number = 0;
message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier;
memcpy(
message_info->publisher_gid.data,
msg_data->attachment.copy_gid().data(),
RMW_GID_STORAGE_SIZE);
message_info->from_intra_process = false;
}
} else {
if (payload_data.empty()) {
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"SubscriptionData not able to get slice data");
return RMW_RET_ERROR;
}
if (serialized_message->buffer_capacity < payload_data.size()) {
rmw_ret_t ret =
rmw_serialized_message_resize(serialized_message, payload_data.size());
if (ret != RMW_RET_OK) {
return ret; // Error message already set
}
}
serialized_message->buffer_length = payload_data.size();
memcpy(
serialized_message->buffer,
reinterpret_cast<char *>(const_cast<uint8_t *>(payload_data.data())),
payload_data.size());

*taken = true;

if (message_info != nullptr) {
message_info->source_timestamp = msg_data->attachment.source_timestamp();
message_info->received_timestamp = msg_data->recv_timestamp;
message_info->publication_sequence_number = msg_data->attachment.sequence_number();
// TODO(clalancette): fill in reception_sequence_number
message_info->reception_sequence_number = 0;
message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier;
memcpy(
message_info->publisher_gid.data,
msg_data->attachment.copy_gid().data(),
RMW_GID_STORAGE_SIZE);
message_info->from_intra_process = false;
}

return RMW_RET_OK;
}
Expand Down

0 comments on commit 9e17da2

Please sign in to comment.