Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup services implementation #88

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 29 additions & 52 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
#include "rmw_data_types.hpp"

///==============================================================================

saved_msg_data::saved_msg_data(zc_owned_payload_t p, uint64_t recv_ts, const uint8_t pub_gid[16])
: payload(p), recv_timestamp(recv_ts)
{
memcpy(publisher_gid, pub_gid, 16);
}

//==============================================================================
void sub_data_handler(
const z_sample_t * sample,
void * data)
Expand Down Expand Up @@ -79,21 +79,23 @@ void sub_data_handler(
}


unsigned int rmw_service_data_t::get_new_uid()
//==============================================================================
std::size_t rmw_service_data_t::get_new_uid()
{
return client_count++;
}

void service_data_handler(const z_query_t * query, void * service_data)
//==============================================================================
void service_data_handler(const z_query_t * query, void * data)
{
RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"[service_data_handler] triggered"
);
z_owned_str_t keystr = z_keyexpr_to_string(z_query_keyexpr(query));

auto rmw_service_data = static_cast<rmw_service_data_t *>(service_data);
if (rmw_service_data == nullptr) {
rmw_service_data_t * service_data = static_cast<rmw_service_data_t *>(data);
if (service_data == nullptr) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to obtain rmw_service_data_t from data for "
Expand All @@ -106,43 +108,31 @@ void service_data_handler(const z_query_t * query, void * service_data)

// Get the query parameters and payload
{
std::lock_guard<std::mutex> lock(rmw_service_data->query_queue_mutex);

const unsigned int client_id = rmw_service_data->get_new_uid();
rmw_service_data->id_query_map.emplace(
std::make_pair(client_id, std::make_unique<z_owned_query_t>(z_query_clone(query))));
rmw_service_data->to_take.push_back(client_id);


std::lock_guard<std::mutex> lock(service_data->query_queue_mutex);
const std::size_t client_id = service_data->get_new_uid();
service_data->id_query_map.emplace(
std::make_pair(client_id, z_query_clone(query)));
service_data->to_take.push_back(client_id);
}
{
// Since we added new data, trigger the guard condition if it is available
std::lock_guard<std::mutex> internal_lock(rmw_service_data->internal_mutex);
if (rmw_service_data->condition != nullptr) {
rmw_service_data->condition->notify_one();
std::lock_guard<std::mutex> internal_lock(service_data->internal_mutex);
if (service_data->condition != nullptr) {
service_data->condition->notify_one();
}
}

z_drop(z_move(keystr));
}

void client_data_handler(z_owned_reply_t * reply, void * client_data)
//==============================================================================
void client_data_handler(z_owned_reply_t * reply, void * data)
{
auto rmw_client_data = static_cast<rmw_client_data_t *>(client_data);
if (rmw_client_data == nullptr) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to obtain rmw_client_data_t "
);
return;
}
RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"[client_data_handler] triggered for %s",
rmw_client_data->service_name
);
if (!z_check(*reply)) {
auto client_data = static_cast<rmw_client_data_t *>(data);
if (client_data == nullptr) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"z_check returned False"
"Unable to obtain client_data_t "
);
return;
}
Expand All @@ -160,29 +150,16 @@ void client_data_handler(z_owned_reply_t * reply, void * client_data)
);
return;
}

z_sample_t sample = z_reply_ok(reply);

z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);

RCUTILS_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"[client_data_handler] keyexpr of sample: %s",
z_loan(keystr)
);

{
std::lock_guard<std::mutex> msg_lock(rmw_client_data->message_mutex);
rmw_client_data->message = std::make_unique<saved_msg_data>(
zc_sample_payload_rcinc(&sample), sample.timestamp.time, sample.timestamp.id.id);
std::lock_guard<std::mutex> msg_lock(client_data->message_mutex);
// Take ownership of the reply.
client_data->replies.emplace_back(*reply);
*reply = z_reply_null();
}
{
std::lock_guard<std::mutex> internal_lock(rmw_client_data->internal_mutex);
if (rmw_client_data->condition != nullptr) {
rmw_client_data->condition->notify_one();
std::lock_guard<std::mutex> internal_lock(client_data->internal_mutex);
if (client_data->condition != nullptr) {
client_data->condition->notify_one();
}
}

z_reply_drop(reply);
z_drop(z_move(keystr));
}
15 changes: 7 additions & 8 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,13 @@ void service_data_handler(const z_query_t * query, void * service_data);

void client_data_handler(z_owned_reply_t * reply, void * client_data);


///==============================================================================

struct rmw_service_data_t
{
unsigned int get_new_uid();
std::size_t get_new_uid();

const char * keyexpr;
z_owned_keyexpr_t keyexpr;
z_owned_queryable_t qable;

const void * request_type_support_impl;
Expand All @@ -157,27 +156,27 @@ struct rmw_service_data_t

// Map to store the query id and the query.
// The query handler is saved as it is needed to answer the query later on.
std::unordered_map<unsigned int, std::unique_ptr<z_owned_query_t>> id_query_map;
std::unordered_map<std::size_t, z_owned_query_t> id_query_map;
// The query id's of the queries that need to be processed.
std::deque<unsigned int> to_take;
std::deque<std::size_t> to_take;
std::mutex query_queue_mutex;

std::mutex internal_mutex;
std::condition_variable * condition{nullptr};

unsigned int client_count{};
std::size_t client_count = 0;
};

///==============================================================================

struct rmw_client_data_t
{
const char * service_name;
z_owned_keyexpr_t keyexpr;

z_owned_closure_reply_t zn_closure_reply;

std::mutex message_mutex;
std::unique_ptr<saved_msg_data> message;
std::vector<z_owned_reply_t> replies;

const void * request_type_support_impl;
const void * response_type_support_impl;
Expand Down
Loading
Loading