Skip to content

Commit

Permalink
Adds base support for services.
Browse files Browse the repository at this point in the history
Signed-off-by: Franco Cipollone <[email protected]>
  • Loading branch information
francocipollone committed Dec 21, 2023
1 parent d734bc7 commit 81b4f70
Show file tree
Hide file tree
Showing 3 changed files with 984 additions and 43 deletions.
95 changes: 95 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,98 @@ void sub_data_handler(

z_drop(z_move(keystr));
}

void service_data_handler(const z_query_t * query, void * service_data)
{
RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"[service_data_handler] triggered"
);
z_owned_str_t keystr = z_keyexpr_to_string(z_query_keyexpr(query));

auto rmw_service_data = static_cast<rmw_service_data_t *>(service_data);
if (rmw_service_data == nullptr) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to obtain rmw_service_data_t from data for "
"service for %s",
z_loan(keystr)
);
return;
}

// Get the query parameters and payload
{
std::lock_guard<std::mutex> lock(rmw_service_data->query_queue_mutex);

const unsigned int client_id = rmw_service_data->get_new_uid();
rmw_service_data->id_query_map.emplace(
std::make_pair(client_id, std::make_unique<saved_queryable_data>(z_query_clone(query))));
rmw_service_data->to_take.push_back(client_id);


// Since we added new data, trigger the guard condition if it is available
std::lock_guard<std::mutex> internal_lock(rmw_service_data->internal_mutex);
if (rmw_service_data->condition != nullptr) {
rmw_service_data->condition->notify_one();
}
}

z_drop(z_move(keystr));
}

void client_data_handler(z_owned_reply_t * reply, void * client_data)
{
auto rmw_client_data = static_cast<rmw_client_data_t *>(client_data);
if (rmw_client_data == nullptr) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to obtain rmw_client_data_t "
);
return;
}
RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"[client_data_handler] triggered for %s",
rmw_client_data->service_name
);
if (!z_reply_check(reply)) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"z_check returned False"
);
return;
}
if (!z_reply_is_ok(reply)) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"z_reply_is_ok returned False"
);
return;
}

z_sample_t sample = z_reply_ok(reply);

z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);

RCUTILS_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"[client_data_handler] keyexpr of sample: %s",
z_loan(keystr)
);

{
std::lock_guard<std::mutex> msg_lock(rmw_client_data->message_mutex);
rmw_client_data->message = std::make_unique<saved_msg_data>(
zc_sample_payload_rcinc(&sample), sample.timestamp.time, sample.timestamp.id.id);
}
{
std::lock_guard<std::mutex> internal_lock(rmw_client_data->internal_mutex);
if (rmw_client_data->condition != nullptr) {
rmw_client_data->condition->notify_one();
}
}

z_reply_drop(reply);
z_drop(z_move(keystr));
}
81 changes: 81 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>

Expand All @@ -31,6 +32,7 @@

#include "graph_cache.hpp"
#include "message_type_support.hpp"
#include "service_type_support.hpp"

/// Structs for various type erased data fields.

Expand Down Expand Up @@ -131,4 +133,83 @@ struct rmw_subscription_data_t
std::condition_variable * condition{nullptr};
};


///==============================================================================

// z_owned_closure_query_t
void service_data_handler(const z_query_t * query, void * service_data);

void client_data_handler(z_owned_reply_t * reply, void * client_data);


struct saved_queryable_data
{
explicit saved_queryable_data(z_owned_query_t query)
: query(query)
{
}

const z_owned_query_t query;
};

///==============================================================================

struct rmw_service_data_t
{
unsigned int get_new_uid()
{
return client_count++;
}

const char * zn_queryable_key;
z_owned_queryable_t zn_queryable;

const void * request_type_support_impl;
const void * response_type_support_impl;
const char * typesupport_identifier;
RequestTypeSupport * request_type_support;
ResponseTypeSupport * response_type_support;

rmw_context_t * context;

// Map to store the query id and the query.
// The query handler is saved as it is needed to answer the query later on.
std::unordered_map<unsigned int, std::unique_ptr<saved_queryable_data>> id_query_map;
// The query id's of the queries that need to be processed.
std::deque<unsigned int> to_take;
std::mutex query_queue_mutex;

std::mutex internal_mutex;
std::condition_variable * condition{nullptr};

unsigned int client_count{};
};

///==============================================================================

struct rmw_client_data_t
{
const char * service_name;

// TODO(francocipollone): Remove this. For some reason if I remove this(not being even used) it
// ends up panicking when calling the service. Something is missing.
z_owned_reply_channel_t zn_reply_channel;
z_owned_closure_reply_t zn_closure_reply;


std::mutex message_mutex;
std::unique_ptr<saved_msg_data> message;

const void * request_type_support_impl;
const void * response_type_support_impl;
const char * typesupport_identifier;
RequestTypeSupport * request_type_support;
ResponseTypeSupport * response_type_support;

rmw_context_t * context;

std::mutex internal_mutex;
std::condition_variable * condition{nullptr};
};

#endif // DETAIL__RMW_DATA_TYPES_HPP_
Loading

0 comments on commit 81b4f70

Please sign in to comment.