Skip to content

Commit

Permalink
Cleanup services implementation (#88)
Browse files Browse the repository at this point in the history
* Rely on channels for sending requests

Signed-off-by: Yadunund <[email protected]>

* Revert to callback for client with fixes

Signed-off-by: Yadunund <[email protected]>

* Cleanup service cb

Signed-off-by: Yadunund <[email protected]>

* Style

Signed-off-by: Yadunund <[email protected]>

* Cleanup comments

Signed-off-by: Yadunund <[email protected]>

---------

Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Dec 28, 2023
1 parent b41bdab commit 62b5738
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 183 deletions.
81 changes: 29 additions & 52 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
#include "rmw_data_types.hpp"

///==============================================================================

saved_msg_data::saved_msg_data(zc_owned_payload_t p, uint64_t recv_ts, const uint8_t pub_gid[16])
: payload(p), recv_timestamp(recv_ts)
{
memcpy(publisher_gid, pub_gid, 16);
}

//==============================================================================
void sub_data_handler(
const z_sample_t * sample,
void * data)
Expand Down Expand Up @@ -79,21 +79,23 @@ void sub_data_handler(
}


unsigned int rmw_service_data_t::get_new_uid()
//==============================================================================
std::size_t rmw_service_data_t::get_new_uid()
{
return client_count++;
}

void service_data_handler(const z_query_t * query, void * service_data)
//==============================================================================
void service_data_handler(const z_query_t * query, void * data)
{
RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"[service_data_handler] triggered"
);
z_owned_str_t keystr = z_keyexpr_to_string(z_query_keyexpr(query));

auto rmw_service_data = static_cast<rmw_service_data_t *>(service_data);
if (rmw_service_data == nullptr) {
rmw_service_data_t * service_data = static_cast<rmw_service_data_t *>(data);
if (service_data == nullptr) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to obtain rmw_service_data_t from data for "
Expand All @@ -106,43 +108,31 @@ void service_data_handler(const z_query_t * query, void * service_data)

// Get the query parameters and payload
{
std::lock_guard<std::mutex> lock(rmw_service_data->query_queue_mutex);

const unsigned int client_id = rmw_service_data->get_new_uid();
rmw_service_data->id_query_map.emplace(
std::make_pair(client_id, std::make_unique<z_owned_query_t>(z_query_clone(query))));
rmw_service_data->to_take.push_back(client_id);


std::lock_guard<std::mutex> lock(service_data->query_queue_mutex);
const std::size_t client_id = service_data->get_new_uid();
service_data->id_query_map.emplace(
std::make_pair(client_id, z_query_clone(query)));
service_data->to_take.push_back(client_id);
}
{
// Since we added new data, trigger the guard condition if it is available
std::lock_guard<std::mutex> internal_lock(rmw_service_data->internal_mutex);
if (rmw_service_data->condition != nullptr) {
rmw_service_data->condition->notify_one();
std::lock_guard<std::mutex> internal_lock(service_data->internal_mutex);
if (service_data->condition != nullptr) {
service_data->condition->notify_one();
}
}

z_drop(z_move(keystr));
}

void client_data_handler(z_owned_reply_t * reply, void * client_data)
//==============================================================================
void client_data_handler(z_owned_reply_t * reply, void * data)
{
auto rmw_client_data = static_cast<rmw_client_data_t *>(client_data);
if (rmw_client_data == nullptr) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to obtain rmw_client_data_t "
);
return;
}
RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"[client_data_handler] triggered for %s",
rmw_client_data->service_name
);
if (!z_check(*reply)) {
auto client_data = static_cast<rmw_client_data_t *>(data);
if (client_data == nullptr) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"z_check returned False"
"Unable to obtain client_data_t "
);
return;
}
Expand All @@ -160,29 +150,16 @@ void client_data_handler(z_owned_reply_t * reply, void * client_data)
);
return;
}

z_sample_t sample = z_reply_ok(reply);

z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);

RCUTILS_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"[client_data_handler] keyexpr of sample: %s",
z_loan(keystr)
);

{
std::lock_guard<std::mutex> msg_lock(rmw_client_data->message_mutex);
rmw_client_data->message = std::make_unique<saved_msg_data>(
zc_sample_payload_rcinc(&sample), sample.timestamp.time, sample.timestamp.id.id);
std::lock_guard<std::mutex> msg_lock(client_data->message_mutex);
// Take ownership of the reply.
client_data->replies.emplace_back(*reply);
*reply = z_reply_null();
}
{
std::lock_guard<std::mutex> internal_lock(rmw_client_data->internal_mutex);
if (rmw_client_data->condition != nullptr) {
rmw_client_data->condition->notify_one();
std::lock_guard<std::mutex> internal_lock(client_data->internal_mutex);
if (client_data->condition != nullptr) {
client_data->condition->notify_one();
}
}

z_reply_drop(reply);
z_drop(z_move(keystr));
}
15 changes: 7 additions & 8 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,13 @@ void service_data_handler(const z_query_t * query, void * service_data);

void client_data_handler(z_owned_reply_t * reply, void * client_data);


///==============================================================================

struct rmw_service_data_t
{
unsigned int get_new_uid();
std::size_t get_new_uid();

const char * keyexpr;
z_owned_keyexpr_t keyexpr;
z_owned_queryable_t qable;

const void * request_type_support_impl;
Expand All @@ -157,27 +156,27 @@ struct rmw_service_data_t

// Map to store the query id and the query.
// The query handler is saved as it is needed to answer the query later on.
std::unordered_map<unsigned int, std::unique_ptr<z_owned_query_t>> id_query_map;
std::unordered_map<std::size_t, z_owned_query_t> id_query_map;
// The query id's of the queries that need to be processed.
std::deque<unsigned int> to_take;
std::deque<std::size_t> to_take;
std::mutex query_queue_mutex;

std::mutex internal_mutex;
std::condition_variable * condition{nullptr};

unsigned int client_count{};
std::size_t client_count = 0;
};

///==============================================================================

struct rmw_client_data_t
{
const char * service_name;
z_owned_keyexpr_t keyexpr;

z_owned_closure_reply_t zn_closure_reply;

std::mutex message_mutex;
std::unique_ptr<saved_msg_data> message;
std::vector<z_owned_reply_t> replies;

const void * request_type_support_impl;
const void * response_type_support_impl;
Expand Down
Loading

0 comments on commit 62b5738

Please sign in to comment.