Skip to content

Commit

Permalink
Resolve merge conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Jan 12, 2024
2 parents 8281049 + da73fdc commit 6367fd2
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 62 deletions.
12 changes: 1 addition & 11 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,6 @@ void sub_data_handler(
}
}


//==============================================================================
std::size_t rmw_service_data_t::get_new_uid()
{
return client_count++;
}

//==============================================================================
void service_data_handler(const z_query_t * query, void * data)
{
Expand Down Expand Up @@ -115,10 +108,7 @@ void service_data_handler(const z_query_t * query, void * data)
// Get the query parameters and payload
{
std::lock_guard<std::mutex> lock(service_data->query_queue_mutex);
const std::size_t client_id = service_data->get_new_uid();
service_data->id_query_map.emplace(
std::make_pair(client_id, z_query_clone(query)));
service_data->to_take.push_back(client_id);
service_data->query_queue.push_back(z_query_clone(query));
}
{
// Since we added new data, trigger the guard condition if it is available
Expand Down
18 changes: 9 additions & 9 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ struct rmw_subscription_data_t

///==============================================================================

// z_owned_closure_query_t
void service_data_handler(const z_query_t * query, void * service_data);

void client_data_handler(z_owned_reply_t * reply, void * client_data);
Expand All @@ -158,17 +157,16 @@ struct rmw_service_data_t

rmw_context_t * context;

// Map to store the query id and the query.
// The query handler is saved as it is needed to answer the query later on.
std::unordered_map<std::size_t, z_owned_query_t> id_query_map;
// The query id's of the queries that need to be processed.
std::deque<std::size_t> to_take;
// Deque to store the queries in the order they arrive.
std::deque<z_owned_query_t> query_queue;
std::mutex query_queue_mutex;

// Map to store the sequence_number -> query_id
std::map<int64_t, z_owned_query_t> sequence_to_query_map;
std::mutex sequence_to_query_map_mutex;

std::mutex internal_mutex;
std::condition_variable * condition{nullptr};

std::size_t client_count = 0;
};

///==============================================================================
Expand All @@ -182,7 +180,7 @@ struct rmw_client_data_t
zc_owned_liveliness_token_t token;

std::mutex message_mutex;
std::vector<z_owned_reply_t> replies;
std::deque<z_owned_reply_t> replies;

const void * request_type_support_impl;
const void * response_type_support_impl;
Expand All @@ -194,6 +192,8 @@ struct rmw_client_data_t

std::mutex internal_mutex;
std::condition_variable * condition{nullptr};

size_t sequence_number{1};
};

#endif // DETAIL__RMW_DATA_TYPES_HPP_
209 changes: 167 additions & 42 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <zenoh.h>

#include <chrono>
#include <cinttypes>
#include <mutex>
#include <new>
#include <sstream>
Expand Down Expand Up @@ -1486,7 +1487,7 @@ static rmw_ret_t __rmw_take(

std::unique_ptr<saved_msg_data> msg_data;
{
std::unique_lock<std::mutex> lock(sub_data->message_queue_mutex);
std::lock_guard<std::mutex> lock(sub_data->message_queue_mutex);

if (sub_data->message_queue.empty()) {
// This tells rcl that the check for a new message was done, but no messages have come in yet.
Expand Down Expand Up @@ -1957,6 +1958,26 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client)
return RMW_RET_OK;
}

static z_owned_bytes_map_t create_map_and_set_sequence_num(int64_t sequence_number)
{
z_owned_bytes_map_t map = z_bytes_map_new();
if (!z_check(map)) {
RMW_SET_ERROR_MSG("failed to allocate map for sequence number");
return z_bytes_map_null();
}

// The largest possible int64_t number is INT64_MAX, i.e. 9223372036854775807.
// That is 19 characters long, plus one for the trailing \0, means we need 20 bytes.
char seq_id_str[20];
if (rcutils_snprintf(seq_id_str, 20, "%" PRId64, sequence_number) < 0) {
RMW_SET_ERROR_MSG("failed to print sequence_number into buffer");
return z_bytes_map_null();
}
z_bytes_map_insert_by_copy(&map, z_bytes_new("sequence_number"), z_bytes_new(seq_id_str));

return map;
}

//==============================================================================
/// Send a ROS service request.
rmw_ret_t
Expand Down Expand Up @@ -2025,11 +2046,24 @@ rmw_send_request(

size_t data_length = ser.getSerializedDataLength();

// TODO(francocipollone): Do I really need the sequency number here?
*sequence_id = 0;
// TODO(clalancette): Locking for multiple requests at the same time
*sequence_id = client_data->sequence_number++;

// Send request
z_get_options_t opts = z_get_options_default();

z_owned_bytes_map_t map = create_map_and_set_sequence_num(*sequence_id);
if (!z_check(map)) {
// create_map_and_set_sequence_num already set the error
return RMW_RET_ERROR;
}
auto free_attachment_map = rcpputils::make_scope_exit(
[&map]() {
z_bytes_map_drop(z_move(map));
});

opts.attachment = z_bytes_map_as_attachment(&map);

opts.target = Z_QUERY_TARGET_ALL;
opts.value.payload = z_bytes_t{data_length, reinterpret_cast<const uint8_t *>(request_bytes)};
opts.value.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL);
Expand All @@ -2041,6 +2075,65 @@ rmw_send_request(
return RMW_RET_OK;
}

static int64_t get_sequence_num_from_attachment(const z_attachment_t * const attachment)
{
// Get the sequence_number out of the attachment
if (!z_check(*attachment)) {
// A valid request must have had an attachment
RMW_SET_ERROR_MSG("Could not get attachment from query");
return -1;
}

z_bytes_t sequence_num_index = z_attachment_get(*attachment, z_bytes_new("sequence_number"));
if (!z_check(sequence_num_index)) {
// A valid request must have had a sequence number attached
RMW_SET_ERROR_MSG("Could not get sequence number from query");
return -1;
}

if (sequence_num_index.len < 1) {
RMW_SET_ERROR_MSG("No value specified for the sequence number");
return -1;
}

if (sequence_num_index.len > 19) {
// The sequence number was larger than we expected
RMW_SET_ERROR_MSG("Sequence number too large");
return -1;
}

// The largest possible int64_t number is INT64_MAX, i.e. 9223372036854775807.
// That is 19 characters long, plus one for the trailing \0, means we need 20 bytes.
char sequence_num_str[20];

memcpy(sequence_num_str, sequence_num_index.start, sequence_num_index.len);
sequence_num_str[sequence_num_index.len] = '\0';

errno = 0;
char * endptr;
int64_t seqnum = strtol(sequence_num_str, &endptr, 10);
if (seqnum == 0) {
// This is an error regardless; the client should never send this
RMW_SET_ERROR_MSG("A invalid zero value sent as the sequence number");
return -1;
} else if (endptr == sequence_num_str) {
// No values were converted, this is an error
RMW_SET_ERROR_MSG("No valid numbers available in the sequence number");
return -1;
} else if (*endptr != '\0') {
// There was junk after the number
RMW_SET_ERROR_MSG("Non-numeric values in the sequence number");
return -1;
} else if (errno != 0) {
// Some other error occurred, which may include overflow or underflow
RMW_SET_ERROR_MSG(
"An undefined error occurred while getting the sequence number, this may be an overflow");
return -1;
}

return seqnum;
}

//==============================================================================
/// Take an incoming ROS service response.
rmw_ret_t
Expand Down Expand Up @@ -2073,11 +2166,10 @@ rmw_take_response(

std::lock_guard<std::mutex> lock(client_data->message_mutex);
if (client_data->replies.empty()) {
// TODO(francocipollone): Verify behavior.
RCUTILS_LOG_ERROR_NAMED("rmw_zenoh_cpp", "[rmw_take_response] Response message is empty");
return RMW_RET_ERROR;
}
latest_reply = &client_data->replies.back();
latest_reply = &client_data->replies.front();
z_sample_t sample = z_reply_ok(latest_reply);

// Object that manages the raw buffer
Expand All @@ -2099,15 +2191,18 @@ rmw_take_response(
return RMW_RET_ERROR;
}

*taken = true;

for (z_owned_reply_t & reply : client_data->replies) {
z_reply_drop(&reply);
request_header->request_id.sequence_number = get_sequence_num_from_attachment(&sample.attachment);
if (request_header->request_id.sequence_number < 0) {
// get_sequence_num_from_attachment already set an error
return RMW_RET_ERROR;
}
client_data->replies.clear();
// TODO(clalancette): We also need to fill in the source_timestamp, received_timestamp,
// and writer_guid

// TODO(francocipollone): Verify request_header information.
request_header->request_id.sequence_number = 0;
*taken = true;

client_data->replies.pop_front();
z_reply_drop(latest_reply);

return RMW_RET_OK;
}
Expand Down Expand Up @@ -2432,9 +2527,11 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service)
// CLEANUP ================================================================
z_drop(z_move(service_data->keyexpr));
z_drop(z_move(service_data->qable));
for (auto & id_query : service_data->id_query_map) {
z_drop(z_move(id_query.second));
for (z_owned_query_t & query : service_data->query_queue) {
z_drop(z_move(query));
}
service_data->query_queue.clear();
service_data->sequence_to_query_map.clear();
z_drop(z_move(service_data->token));

allocator->deallocate(service_data->request_type_support, allocator->state);
Expand Down Expand Up @@ -2477,24 +2574,40 @@ rmw_take_request(
RMW_CHECK_FOR_NULL_WITH_MSG(
service->data, "Unable to retrieve service_data from service", RMW_RET_INVALID_ARGUMENT);

std::unique_lock<std::mutex> lock(service_data->query_queue_mutex);
if (service_data->id_query_map.empty()) {
// TODO(francocipollone): Verify behavior.
RCUTILS_LOG_INFO_NAMED("rmw_zenoh_cpp", "[rmw_take_request] Take id_query_map is empty");
return RMW_RET_OK;
z_owned_query_t query;
{
std::lock_guard<std::mutex> lock(service_data->query_queue_mutex);
if (service_data->query_queue.empty()) {
return RMW_RET_OK;
}
query = service_data->query_queue.front();
}
const std::size_t query_id = service_data->to_take.back();
auto query_it = service_data->id_query_map.find(query_id);
if (query_it == service_data->id_query_map.end()) {
RMW_SET_ERROR_MSG("Query id not found in id_query_map");

const z_query_t loaned_query = z_query_loan(&query);

// Get the sequence_number out of the attachment
z_attachment_t attachment = z_query_attachment(&loaned_query);

int64_t sequence_number = get_sequence_num_from_attachment(&attachment);
if (sequence_number < 0) {
// get_sequence_number_from_attachment already set the error
return RMW_RET_ERROR;
}
service_data->to_take.pop_back();
service_data->query_queue_mutex.unlock();

// Add this query to the map, so that rmw_send_response can quickly look it up later
{
std::lock_guard<std::mutex> lock(service_data->sequence_to_query_map_mutex);
if (service_data->sequence_to_query_map.find(sequence_number) !=
service_data->sequence_to_query_map.end())
{
RMW_SET_ERROR_MSG("duplicate sequence number in the map");
return RMW_RET_ERROR;
}
service_data->sequence_to_query_map.emplace(std::pair(sequence_number, query));
}

// DESERIALIZE MESSAGE ========================================================
const z_query_t z_loaned_query = z_query_loan(&query_it->second);
z_value_t payload_value = z_query_value(&z_loaned_query);
z_value_t payload_value = z_query_value(&loaned_query);

// Object that manages the raw buffer
eprosima::fastcdr::FastBuffer fastbuffer(
Expand All @@ -2516,7 +2629,11 @@ rmw_take_request(
}

// Fill in the request header.
request_header->request_id.sequence_number = query_id;
// TODO(clalancette): We also need to fill in writer_guid, source_timestamp,
// and received_timestamp
request_header->request_id.sequence_number = sequence_number;

service_data->query_queue.pop_front();

*taken = true;

Expand Down Expand Up @@ -2589,28 +2706,36 @@ rmw_send_response(

size_t data_length = ser.getSerializedDataLength();

size_t meta_length = sizeof(request_header->sequence_number);
memcpy(
&response_bytes[data_length],
reinterpret_cast<char *>(&request_header->sequence_number),
meta_length);

// Create the queryable payload
std::lock_guard<std::mutex> lock(service_data->query_queue_mutex);
auto query_it = service_data->id_query_map.find(request_header->sequence_number);
if (query_it == service_data->id_query_map.end()) {
std::lock_guard<std::mutex> lock(service_data->sequence_to_query_map_mutex);
auto query_it = service_data->sequence_to_query_map.find(request_header->sequence_number);
if (query_it == service_data->sequence_to_query_map.end()) {
RMW_SET_ERROR_MSG("Unable to find taken request. Report this bug.");
return RMW_RET_ERROR;
}
const z_query_t z_loaned_query = z_query_loan(&query_it->second);
const z_query_t loaned_query = z_query_loan(&query_it->second);
z_query_reply_options_t options = z_query_reply_options_default();

// TODO(clalancette): We also need to fill in and send the writer_guid
z_owned_bytes_map_t map = create_map_and_set_sequence_num(request_header->sequence_number);
if (!z_check(map)) {
// create_map_and_set_sequence_num already set the error
return RMW_RET_ERROR;
}
auto free_attachment_map = rcpputils::make_scope_exit(
[&map]() {
z_bytes_map_drop(z_move(map));
});

options.attachment = z_bytes_map_as_attachment(&map);

options.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL);
z_query_reply(
&z_loaned_query, z_loan(service_data->keyexpr), reinterpret_cast<const uint8_t *>(
response_bytes), data_length + meta_length, &options);
&loaned_query, z_loan(service_data->keyexpr), reinterpret_cast<const uint8_t *>(
response_bytes), data_length, &options);

z_drop(z_move(query_it->second));
service_data->id_query_map.erase(query_it);
service_data->sequence_to_query_map.erase(query_it);
return RMW_RET_OK;
}

Expand Down Expand Up @@ -2943,7 +3068,7 @@ rmw_wait(
auto serv_data = static_cast<rmw_service_data_t *>(services->services[i]);
if (serv_data != nullptr) {
serv_data->condition = nullptr;
if (serv_data->to_take.empty()) {
if (serv_data->query_queue.empty()) {
// Setting to nullptr lets rcl know that this service is not ready
services->services[i] = nullptr;
}
Expand Down

0 comments on commit 6367fd2

Please sign in to comment.