Skip to content

Commit

Permalink
client data
Browse files Browse the repository at this point in the history
Signed-off-by: Alejandro Hernández Cordero <[email protected]>
  • Loading branch information
ahcorde committed Nov 22, 2024
1 parent 97b2b62 commit eafe1da
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 236 deletions.
79 changes: 0 additions & 79 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,83 +83,4 @@ AttachmentData::AttachmentData(const zenoh::Bytes & attachment)
}
this->source_gid = deserializer.deserialize<std::vector<uint8_t>>();
}

attachment_data_t::attachment_data_t(
const int64_t _sequence_number,
const int64_t _source_timestamp,
const uint8_t _source_gid[RMW_GID_STORAGE_SIZE])
{
sequence_number = _sequence_number;
source_timestamp = _source_timestamp;
memcpy(source_gid, _source_gid, RMW_GID_STORAGE_SIZE);
}

attachment_data_t::attachment_data_t(attachment_data_t && data)
{
sequence_number = std::move(data.sequence_number);
source_timestamp = std::move(data.source_timestamp);
memcpy(source_gid, data.source_gid, RMW_GID_STORAGE_SIZE);
}

void attachment_data_t::serialize_to_zbytes(z_owned_bytes_t * attachment)
{
ze_owned_serializer_t serializer;
ze_serializer_empty(&serializer);
ze_serializer_serialize_str(z_loan_mut(serializer), "sequence_number");
ze_serializer_serialize_int64(z_loan_mut(serializer), this->sequence_number);
ze_serializer_serialize_str(z_loan_mut(serializer), "source_timestamp");
ze_serializer_serialize_int64(z_loan_mut(serializer), this->source_timestamp);
ze_serializer_serialize_str(z_loan_mut(serializer), "source_gid");
ze_serializer_serialize_buf(z_loan_mut(serializer), this->source_gid, RMW_GID_STORAGE_SIZE);
ze_serializer_finish(z_move(serializer), attachment);
}

attachment_data_t::attachment_data_t(const z_loaned_bytes_t * attachment)
{
ze_deserializer_t deserializer = ze_deserializer_from_bytes(attachment);
z_owned_string_t key;

ze_deserializer_deserialize_string(&deserializer, &key);

// Deserialize the sequence_number
if (std::string_view(
z_string_data(z_loan(key)),
z_string_len(z_loan(key))) != "sequence_number")
{
throw std::runtime_error("sequence_number is not found in the attachment.");
}
z_drop(z_move(key));
if (ze_deserializer_deserialize_int64(&deserializer, &this->sequence_number)) {
throw std::runtime_error("Failed to deserialize the sequence_number.");
}

// Deserialize the source_timestamp
ze_deserializer_deserialize_string(&deserializer, &key);
if (std::string_view(
z_string_data(z_loan(key)),
z_string_len(z_loan(key))) != "source_timestamp")
{
throw std::runtime_error("source_timestamp is not found in the attachment");
}
z_drop(z_move(key));
if (ze_deserializer_deserialize_int64(&deserializer, &this->source_timestamp)) {
throw std::runtime_error("Failed to deserialize the source_timestamp.");
}

// Deserialize the source_gid
ze_deserializer_deserialize_string(&deserializer, &key);
if (std::string_view(z_string_data(z_loan(key)), z_string_len(z_loan(key))) != "source_gid") {
throw std::runtime_error("Invalid attachment: the key source_gid is not found");
}
z_drop(z_move(key));
z_owned_slice_t slice;
if (ze_deserializer_deserialize_slice(&deserializer, &slice)) {
throw std::runtime_error("Failed to deserialize the source_gid.");
}
if (z_slice_len(z_loan(slice)) != RMW_GID_STORAGE_SIZE) {
throw std::runtime_error("The length of source_gid mismatched.");
}
memcpy(this->source_gid, z_slice_data(z_loan(slice)), z_slice_len(z_loan(slice)));
z_drop(z_move(slice));
}
} // namespace rmw_zenoh_cpp
17 changes: 0 additions & 17 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,6 @@ class AttachmentData final

zenoh::Bytes serialize_to_zbytes();
};

class attachment_data_t final
{
public:
explicit attachment_data_t(
const int64_t _sequence_number,
const int64_t _source_timestamp,
const uint8_t _source_gid[RMW_GID_STORAGE_SIZE]);
explicit attachment_data_t(const z_loaned_bytes_t *);
explicit attachment_data_t(attachment_data_t && data);

int64_t sequence_number;
int64_t source_timestamp;
uint8_t source_gid[RMW_GID_STORAGE_SIZE];

void serialize_to_zbytes(z_owned_bytes_t *);
};
} // namespace rmw_zenoh_cpp

#endif // DETAIL__ATTACHMENT_HELPERS_HPP_
214 changes: 98 additions & 116 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,69 +33,6 @@
#include "rcpputils/scope_exit.hpp"
#include "rmw/error_handling.h"

namespace
{

///=============================================================================
void client_data_handler(z_loaned_reply_t * reply, void * data)
{
auto client_data = static_cast<rmw_zenoh_cpp::ClientData *>(data);
if (client_data == nullptr) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to obtain client_data_t from data in client_data_handler."
);
return;
}

if (client_data->is_shutdown()) {
return;
}

if (!z_reply_is_ok(reply)) {
const z_loaned_reply_err_t * err = z_reply_err(reply);
const z_loaned_bytes_t * err_payload = z_reply_err_payload(err);

z_owned_string_t err_str;
z_bytes_to_string(err_payload, &err_str);

RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"z_reply_is_ok returned False for keyexpr %s. Reason: %.*s",
client_data->topic_info().topic_keyexpr_.c_str(),
static_cast<int>(z_string_len(z_loan(err_str))),
z_string_data(z_loan(err_str)));
z_drop(z_move(err_str));

return;
}

std::chrono::nanoseconds::rep received_timestamp =
std::chrono::system_clock::now().time_since_epoch().count();

z_owned_reply_t owned_reply;
z_reply_clone(&owned_reply, reply);
client_data->add_new_reply(
std::make_unique<rmw_zenoh_cpp::ZenohReply>(reply, received_timestamp));
}

///=============================================================================
void client_data_drop(void * data)
{
auto client_data = static_cast<rmw_zenoh_cpp::ClientData *>(data);
if (client_data == nullptr) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to obtain client_data_t from data in client_data_drop."
);
return;
}

client_data->decrement_in_flight_and_conditionally_remove();
}

} // namespace

namespace rmw_zenoh_cpp
{
///=============================================================================
Expand Down Expand Up @@ -309,47 +246,66 @@ rmw_ret_t ClientData::take_response(
std::unique_ptr<ZenohReply> latest_reply = std::move(reply_queue_.front());
reply_queue_.pop_front();

if (!latest_reply->get_sample().has_value()) {
auto & reply = latest_reply->get_sample();

if(!reply.is_ok())
{
RMW_SET_ERROR_MSG("invalid reply sample");
return RMW_RET_ERROR;
}
const z_loaned_sample_t * sample = latest_reply->get_sample().value();

const zenoh::Sample & sample = reply.get_ok();

// Object that manages the raw buffer
z_owned_slice_t payload;
z_bytes_to_slice(z_sample_payload(sample), &payload);
eprosima::fastcdr::FastBuffer fastbuffer(
reinterpret_cast<char *>(const_cast<uint8_t *>(z_slice_data(z_loan(payload)))),
z_slice_len(z_loan(payload)));
auto & payload = sample.get_payload();
auto slice = payload.slice_iter().next();
if (slice.has_value()) {
const uint8_t * payload = slice.value().data;
const size_t payload_len = slice.value().len;

eprosima::fastcdr::FastBuffer fastbuffer(
reinterpret_cast<char *>(const_cast<uint8_t *>(payload)), payload_len);

// Object that serializes the data
rmw_zenoh_cpp::Cdr deser(fastbuffer);
if (!response_type_support_->deserialize_ros_message(
deser.get_cdr(),
ros_response,
response_type_support_impl_))
{
RMW_SET_ERROR_MSG("could not deserialize ROS response");
return RMW_RET_ERROR;
}

// Object that serializes the data
rmw_zenoh_cpp::Cdr deser(fastbuffer);
if (!response_type_support_->deserialize_ros_message(
deser.get_cdr(),
ros_response,
response_type_support_impl_))
{
RMW_SET_ERROR_MSG("could not deserialize ROS response");
return RMW_RET_ERROR;
}
// Fill in the request_header
if (!sample.get_attachment().has_value())
{
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"ClientData take_request attachment is empty");
return RMW_RET_ERROR;
}
rmw_zenoh_cpp::AttachmentData attachment(std::move(sample.get_attachment().value().get()));
request_header->request_id.sequence_number = attachment.sequence_number;
if (request_header->request_id.sequence_number < 0) {
RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment");
return RMW_RET_ERROR;
}
request_header->source_timestamp = attachment.source_timestamp;
if (request_header->source_timestamp < 0) {
RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment");
return RMW_RET_ERROR;
}
memcpy(request_header->request_id.writer_guid, attachment.source_gid.data(), RMW_GID_STORAGE_SIZE);
request_header->received_timestamp = latest_reply->get_received_timestamp();

// Fill in the request_header
rmw_zenoh_cpp::attachment_data_t attachment(z_sample_attachment(sample));
request_header->request_id.sequence_number = attachment.sequence_number;
if (request_header->request_id.sequence_number < 0) {
RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment");
return RMW_RET_ERROR;
}
request_header->source_timestamp = attachment.source_timestamp;
if (request_header->source_timestamp < 0) {
RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment");
return RMW_RET_ERROR;
*taken = true;
} else {
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"ClientData not able to get slice data");
return RMW_RET_ERROR;
}
memcpy(request_header->request_id.writer_guid, attachment.source_gid, RMW_GID_STORAGE_SIZE);
request_header->received_timestamp = latest_reply->get_received_timestamp();

z_drop(z_move(payload));
*taken = true;

return RMW_RET_OK;
}
Expand Down Expand Up @@ -402,16 +358,10 @@ rmw_ret_t ClientData::send_request(
*sequence_id = sequence_number_++;

// Send request
z_get_options_t opts;
z_get_options_default(&opts);
z_owned_bytes_t attachment;
zenoh::Session::GetOptions opts = zenoh::Session::GetOptions::create_default();
uint8_t local_gid[RMW_GID_STORAGE_SIZE];
entity_->copy_gid(local_gid);
rmw_zenoh_cpp::create_map_and_set_sequence_num(
&attachment, *sequence_id,
local_gid);
opts.attachment = z_move(attachment);

opts.attachment = rmw_zenoh_cpp::create_map_and_set_sequence_num(*sequence_id, local_gid);
opts.target = Z_QUERY_TARGET_ALL_COMPLETE;
// The default timeout for a z_get query is 10 seconds and if a response is not received within
// this window, the queryable will return an invalid reply. However, it is common for actions,
Expand All @@ -421,24 +371,56 @@ rmw_ret_t ClientData::send_request(
// Latest consolidation guarantees unicity of replies for the same key expression,
// which optimizes bandwidth. The default is "None", which imples replies may come in any order
// and any number.
opts.consolidation = z_query_consolidation_latest();
opts.consolidation = zenoh::ConsolidationMode::Z_CONSOLIDATION_MODE_NONE;

z_owned_bytes_t payload;
z_bytes_copy_from_buf(
&payload, reinterpret_cast<const uint8_t *>(request_bytes), data_length);
opts.payload = z_move(payload);
std::vector<uint8_t> raw_bytes(
reinterpret_cast<const uint8_t *>(request_bytes),
reinterpret_cast<const uint8_t *>(request_bytes) + data_length);
opts.payload = zenoh::Bytes(raw_bytes);

// TODO(Yadunund): Once we switch to zenoh-cpp with lambda closures,
// capture shared_from_this() instead of this.
num_in_flight_++;
z_owned_closure_reply_t zn_closure_reply;
z_closure(&zn_closure_reply, client_data_handler, client_data_drop, this);
z_get(
context_impl->session(),
z_loan(keyexpr_.value()._0), "",
z_move(zn_closure_reply),
&opts);

std::weak_ptr<rmw_zenoh_cpp::ClientData> client_data = shared_from_this();
zenoh::ZResult err;
std::string parameters;
context_impl->session_cpp()->get(
keyexpr_.value(),
parameters,
[client_data](const zenoh::Reply& reply) {

if (!reply.is_ok()) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"z_reply_is_ok returned False Reason: %s",
reply.get_err().get_payload().as_string())
return;
}
const zenoh::Sample & sample = reply.get_ok();

auto sub_data = client_data.lock();
if (sub_data == nullptr) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to obtain ClientData from data for %s.",
std::string(sample.get_keyexpr().as_string_view()));
return;
}

if (sub_data->is_shutdown()) {
return;
}

std::chrono::nanoseconds::rep received_timestamp =
std::chrono::system_clock::now().time_since_epoch().count();

sub_data->add_new_reply(
std::make_unique<rmw_zenoh_cpp::ZenohReply>(reply, received_timestamp));
},
zenoh::closures::none,
std::move(opts),
&err);
return RMW_RET_OK;
}

Expand Down
Loading

0 comments on commit eafe1da

Please sign in to comment.