Skip to content

Commit

Permalink
Adds base support for services. (#86)
Browse files Browse the repository at this point in the history
* Adds base support for services.

Signed-off-by: Franco Cipollone <[email protected]>

* Addresses Yadu's comments.

Signed-off-by: Franco Cipollone <[email protected]>

* Addresses Yadu's comments.

Signed-off-by: Franco Cipollone <[email protected]>

* Fixes memory leak.

Signed-off-by: Franco Cipollone <[email protected]>

* Removes unnecessary declaration.

Signed-off-by: Franco Cipollone <[email protected]>

* Cleanup services implementation (#88)

* Rely on channels for sending requests

Signed-off-by: Yadunund <[email protected]>

* Revert to callback for client with fixes

Signed-off-by: Yadunund <[email protected]>

* Cleanup service cb

Signed-off-by: Yadunund <[email protected]>

* Style

Signed-off-by: Yadunund <[email protected]>

* Cleanup comments

Signed-off-by: Yadunund <[email protected]>

---------

Signed-off-by: Yadunund <[email protected]>

* Use anynomous space instead of static functions.

Signed-off-by: Franco Cipollone <[email protected]>

* Fix style.

Signed-off-by: Franco Cipollone <[email protected]>

* Use zero_allocate where needed.

Signed-off-by: Franco Cipollone <[email protected]>

* Use a scope_exit to drop the keystr.

This just makes sure we always clean it up.

Signed-off-by: Chris Lalancette <[email protected]>

* Rename find_type_support to find_message_type_support.

Signed-off-by: Chris Lalancette <[email protected]>

* Add error checking into ros_topic_name_to_zenoh_key

Signed-off-by: Chris Lalancette <[email protected]>

* Make sure to always free response_bytes.

Signed-off-by: Chris Lalancette <[email protected]>

* Remove unnecessary TODO comment.

Signed-off-by: Chris Lalancette <[email protected]>

* Remember to free request_bytes.

Signed-off-by: Chris Lalancette <[email protected]>

* Small change to take requests from the front of the deque.

Signed-off-by: Chris Lalancette <[email protected]>

* Initial work for attachments and sequence numbers.

Signed-off-by: Chris Lalancette <[email protected]>

* Add in error checking for getting attachments.

Signed-off-by: Chris Lalancette <[email protected]>

* More error checking for attachments.

Signed-off-by: Chris Lalancette <[email protected]>

* Further cleanup.

Signed-off-by: Chris Lalancette <[email protected]>

* Remove unnecessary (and incorrect) copy of sequence_number

Signed-off-by: Chris Lalancette <[email protected]>

* Style

Signed-off-by: Yadunund <[email protected]>

---------

Signed-off-by: Franco Cipollone <[email protected]>
Signed-off-by: Yadunund <[email protected]>
Signed-off-by: Chris Lalancette <[email protected]>
Co-authored-by: Yadu <[email protected]>
Co-authored-by: Chris Lalancette <[email protected]>
  • Loading branch information
3 people authored Jan 12, 2024
1 parent 0663919 commit b201d81
Show file tree
Hide file tree
Showing 4 changed files with 1,263 additions and 213 deletions.
3 changes: 2 additions & 1 deletion rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ void GraphCache::parse_put(const std::string & keyexpr)
} else {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to add a new node /%s with id %s an existing namespace %s in the graph. Report this bug.",
"Unable to add a new node /%s with id %s an "
"existing namespace %s in the graph. Report this bug.",
entity.node_name().c_str(),
entity.id().c_str(),
entity.node_namespace().c_str());
Expand Down
89 changes: 88 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,28 @@
#include <mutex>
#include <utility>

#include "rcpputils/scope_exit.hpp"
#include "rcutils/logging_macros.h"

#include "rmw_data_types.hpp"

///==============================================================================
saved_msg_data::saved_msg_data(zc_owned_payload_t p, uint64_t recv_ts, const uint8_t pub_gid[16])
: payload(p), recv_timestamp(recv_ts)
{
memcpy(publisher_gid, pub_gid, 16);
}

//==============================================================================
void sub_data_handler(
const z_sample_t * sample,
void * data)
{
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
auto drop_keystr = rcpputils::make_scope_exit(
[&keystr]() {
z_drop(z_move(keystr));
});

auto sub_data = static_cast<rmw_subscription_data_t *>(data);
if (sub_data == nullptr) {
Expand Down Expand Up @@ -67,6 +79,81 @@ void sub_data_handler(
sub_data->condition->notify_one();
}
}
}

//==============================================================================
void service_data_handler(const z_query_t * query, void * data)
{
RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"[service_data_handler] triggered"
);
z_owned_str_t keystr = z_keyexpr_to_string(z_query_keyexpr(query));
auto drop_keystr = rcpputils::make_scope_exit(
[&keystr]() {
z_drop(z_move(keystr));
});

rmw_service_data_t * service_data = static_cast<rmw_service_data_t *>(data);
if (service_data == nullptr) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to obtain rmw_service_data_t from data for "
"service for %s",
z_loan(keystr)
);
return;
}

// Get the query parameters and payload
{
std::lock_guard<std::mutex> lock(service_data->query_queue_mutex);
service_data->query_queue.push_back(z_query_clone(query));
}
{
// Since we added new data, trigger the guard condition if it is available
std::lock_guard<std::mutex> internal_lock(service_data->internal_mutex);
if (service_data->condition != nullptr) {
service_data->condition->notify_one();
}
}
}

z_drop(z_move(keystr));
//==============================================================================
void client_data_handler(z_owned_reply_t * reply, void * data)
{
auto client_data = static_cast<rmw_client_data_t *>(data);
if (client_data == nullptr) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to obtain client_data_t "
);
return;
}
if (!z_reply_check(reply)) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"z_reply_check returned False"
);
return;
}
if (!z_reply_is_ok(reply)) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"z_reply_is_ok returned False"
);
return;
}
{
std::lock_guard<std::mutex> msg_lock(client_data->message_mutex);
// Take ownership of the reply.
client_data->replies.emplace_back(*reply);
*reply = z_reply_null();
}
{
std::lock_guard<std::mutex> internal_lock(client_data->internal_mutex);
if (client_data->condition != nullptr) {
client_data->condition->notify_one();
}
}
}
70 changes: 65 additions & 5 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,18 @@
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>

#include "rcutils/allocator.h"

#include "rmw/rmw.h"

#include "graph_cache.hpp"
#include "message_type_support.hpp"
#include "service_type_support.hpp"

/// Structs for various type erased data fields.

Expand Down Expand Up @@ -97,11 +100,7 @@ void sub_data_handler(const z_sample_t * sample, void * sub_data);

struct saved_msg_data
{
explicit saved_msg_data(zc_owned_payload_t p, uint64_t recv_ts, const uint8_t pub_gid[16])
: payload(p), recv_timestamp(recv_ts)
{
memcpy(publisher_gid, pub_gid, 16);
}
explicit saved_msg_data(zc_owned_payload_t p, uint64_t recv_ts, const uint8_t pub_gid[16]);

zc_owned_payload_t payload;
uint64_t recv_timestamp;
Expand Down Expand Up @@ -131,4 +130,65 @@ struct rmw_subscription_data_t
std::condition_variable * condition{nullptr};
};


///==============================================================================

void service_data_handler(const z_query_t * query, void * service_data);

void client_data_handler(z_owned_reply_t * reply, void * client_data);

///==============================================================================

struct rmw_service_data_t
{
std::size_t get_new_uid();

z_owned_keyexpr_t keyexpr;
z_owned_queryable_t qable;

const void * request_type_support_impl;
const void * response_type_support_impl;
const char * typesupport_identifier;
RequestTypeSupport * request_type_support;
ResponseTypeSupport * response_type_support;

rmw_context_t * context;

// Deque to store the queries in the order they arrive.
std::deque<z_owned_query_t> query_queue;
std::mutex query_queue_mutex;

// Map to store the sequence_number -> query_id
std::map<int64_t, z_owned_query_t> sequence_to_query_map;
std::mutex sequence_to_query_map_mutex;

std::mutex internal_mutex;
std::condition_variable * condition{nullptr};
};

///==============================================================================

struct rmw_client_data_t
{
z_owned_keyexpr_t keyexpr;

z_owned_closure_reply_t zn_closure_reply;

std::mutex message_mutex;
std::deque<z_owned_reply_t> replies;

const void * request_type_support_impl;
const void * response_type_support_impl;
const char * typesupport_identifier;
RequestTypeSupport * request_type_support;
ResponseTypeSupport * response_type_support;

rmw_context_t * context;

std::mutex internal_mutex;
std::condition_variable * condition{nullptr};

size_t sequence_number{1};
};

#endif // DETAIL__RMW_DATA_TYPES_HPP_
Loading

0 comments on commit b201d81

Please sign in to comment.