Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve service and subscription reliability #92

Merged
merged 18 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 0 additions & 18 deletions rmw_zenoh_config.json5

This file was deleted.

44 changes: 5 additions & 39 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ void GraphCache::parse_put(const std::string & keyexpr)
return graph_node.clients_;
}
}(entity, graph_node, entity_desc);
// For the sake of reusing data structures and lookup functions, we treat publishers and clients are equivalent.
// Similarly, subscriptions and services are equivalent.
// For the sake of reusing data structures and lookup functions, we treat publishers and
// clients are equivalent. Similarly, subscriptions and services are equivalent.
const std::size_t pub_count = entity.type() == EntityType::Publisher ||
entity.type() == EntityType::Client ? 1 : 0;
const std::size_t sub_count = !pub_count;
Expand Down Expand Up @@ -172,15 +172,6 @@ void GraphCache::parse_put(const std::string & keyexpr)
topic_data_insertion.first->second->stats_.sub_count_ += sub_count;
}
}

RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"Added %s on topic %s with type %s and qos %s to node /%s.",
entity_desc.c_str(),
topic_info.name_.c_str(),
topic_info.type_.c_str(),
topic_info.qos_.c_str(),
graph_node.name_.c_str());
};

// Helper lambda to convert an Entity into a GraphNode.
Expand Down Expand Up @@ -213,10 +204,6 @@ void GraphCache::parse_put(const std::string & keyexpr)
NodeMap node_map = {
{entity.node_name(), make_graph_node(entity, *this)}};
graph_.emplace(std::make_pair(entity.node_namespace(), std::move(node_map)));
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp", "Added node /%s to a new namespace %s in the graph.",
entity.node_name().c_str(),
entity.node_namespace().c_str());
return;
}

Expand All @@ -238,14 +225,7 @@ void GraphCache::parse_put(const std::string & keyexpr)
// name but unique id.
NodeMap::iterator insertion_it =
ns_it->second.insert(std::make_pair(entity.node_name(), make_graph_node(entity, *this)));
if (insertion_it != ns_it->second.end()) {
RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"Added a new node /%s with id %s to an existing namespace %s in the graph.",
entity.node_name().c_str(),
entity.id().c_str(),
entity.node_namespace().c_str());
} else {
if (insertion_it == ns_it->second.end()) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to add a new node /%s with id %s an "
Expand Down Expand Up @@ -369,8 +349,8 @@ void GraphCache::parse_del(const std::string & keyexpr)
return graph_node.clients_;
}
}(entity, graph_node, entity_desc);
// For the sake of reusing data structures and lookup functions, we treat publishers and clients are equivalent.
// Similarly, subscriptions and services are equivalent.
// For the sake of reusing data structures and lookup functions, we treat publishers and
// clients are equivalent. Similarly, subscriptions and services are equivalent.
const std::size_t pub_count = entity.type() == EntityType::Publisher ||
entity.type() == EntityType::Client ? 1 : 0;
const std::size_t sub_count = !pub_count;
Expand Down Expand Up @@ -409,15 +389,6 @@ void GraphCache::parse_del(const std::string & keyexpr)

// Bookkeeping: Update graph_topic_ which keeps track of topics across all nodes in the graph.
update_graph_topics(topic_info, entity.type(), pub_count, sub_count, graph_cache);

RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"Removed %s on topic %s with type %s and qos %s to node /%s.",
entity_desc.c_str(),
topic_info.name_.c_str(),
topic_info.type_.c_str(),
topic_info.qos_.c_str(),
graph_node.name_.c_str());
};

// Lock the graph mutex before accessing the graph.
Expand Down Expand Up @@ -484,11 +455,6 @@ void GraphCache::parse_del(const std::string & keyexpr)
remove_topics(graph_node->clients_, EntityType::Client, *this);
}
ns_it->second.erase(node_it);
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"Removed node /%s from the graph.",
entity.node_name().c_str()
);
return;
}

Expand Down
2 changes: 0 additions & 2 deletions rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ struct TopicData
using TopicDataPtr = std::shared_ptr<TopicData>;

///=============================================================================
// TODO(Yadunund): Expand to services and clients.
struct GraphNode
{
std::string id_;
Expand All @@ -82,7 +81,6 @@ struct GraphNode
// Entires for service/client.
TopicMap clients_ = {};
TopicMap services_ = {};

};
using GraphNodePtr = std::shared_ptr<GraphNode>;

Expand Down
4 changes: 2 additions & 2 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ bool PublishToken::put(
RCUTILS_SET_ERROR_MSG("invalid keyexpression generation for liveliness publication.");
return false;
}
RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "Sending PUT on %s", token.c_str());

z_put_options_t options = z_put_options_default();
options.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL);
if (z_put(z_loan(*session), z_keyexpr(token.c_str()), nullptr, 0, &options) < 0) {
Expand Down Expand Up @@ -379,7 +379,7 @@ bool PublishToken::del(
RCUTILS_SET_ERROR_MSG("invalid key-expression generation for liveliness publication.");
return false;
}
RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "Sending DELETE on %s", token.c_str());

const z_delete_options_t options = z_delete_options_default();
if (z_delete(z_loan(*session), z_loan(keyexpr), &options) < 0) {
RCUTILS_SET_ERROR_MSG("failed to delete liveliness key");
Expand Down
58 changes: 48 additions & 10 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

#include <zenoh.h>

#include <cstring>
#include <mutex>
#include <optional>
#include <utility>

#include "rcpputils/scope_exit.hpp"
Expand Down Expand Up @@ -63,12 +65,12 @@ void sub_data_handler(
sub_data->queue_depth,
z_loan(keystr));

std::unique_ptr<saved_msg_data> old = std::move(sub_data->message_queue.back());
std::unique_ptr<saved_msg_data> old = std::move(sub_data->message_queue.front());
z_drop(&old->payload);
sub_data->message_queue.pop_back();
sub_data->message_queue.pop_front();
}

sub_data->message_queue.emplace_front(
sub_data->message_queue.emplace_back(
std::make_unique<saved_msg_data>(
zc_sample_payload_rcinc(sample),
sample->timestamp.time, sample->timestamp.id.id));
Expand All @@ -81,13 +83,24 @@ void sub_data_handler(
}
}

ZenohQuery::ZenohQuery(const z_query_t * query)
{
query_ = z_query_clone(query);
}

ZenohQuery::~ZenohQuery()
{
z_drop(z_move(query_));
}

const z_query_t ZenohQuery::get_query() const
{
return z_query_loan(&query_);
}

//==============================================================================
void service_data_handler(const z_query_t * query, void * data)
{
RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"[service_data_handler] triggered"
);
z_owned_str_t keystr = z_keyexpr_to_string(z_query_keyexpr(query));
auto drop_keystr = rcpputils::make_scope_exit(
[&keystr]() {
Expand All @@ -108,7 +121,7 @@ void service_data_handler(const z_query_t * query, void * data)
// Get the query parameters and payload
{
std::lock_guard<std::mutex> lock(service_data->query_queue_mutex);
service_data->query_queue.push_back(z_query_clone(query));
service_data->query_queue.emplace_back(std::make_unique<ZenohQuery>(query));
}
{
// Since we added new data, trigger the guard condition if it is available
Expand All @@ -119,6 +132,31 @@ void service_data_handler(const z_query_t * query, void * data)
}
}

ZenohReply::ZenohReply(const z_owned_reply_t * reply)
{
reply_ = *reply;
}

ZenohReply::~ZenohReply()
{
z_reply_drop(z_move(reply_));
}

std::optional<z_sample_t> ZenohReply::get_sample() const
{
if (z_reply_is_ok(&reply_)) {
return z_reply_ok(&reply_);
}

return std::nullopt;
}

size_t rmw_client_data_t::get_next_sequence_number()
{
std::lock_guard<std::mutex> lock(sequence_number_mutex);
return sequence_number++;
}

//==============================================================================
void client_data_handler(z_owned_reply_t * reply, void * data)
{
Expand All @@ -145,9 +183,9 @@ void client_data_handler(z_owned_reply_t * reply, void * data)
return;
}
{
std::lock_guard<std::mutex> msg_lock(client_data->message_mutex);
std::lock_guard<std::mutex> msg_lock(client_data->replies_mutex);
// Take ownership of the reply.
client_data->replies.emplace_back(*reply);
client_data->replies.emplace_back(std::make_unique<ZenohReply>(reply));
*reply = z_reply_null();
}
{
Expand Down
41 changes: 35 additions & 6 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <deque>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <unordered_map>
#include <unordered_set>
Expand Down Expand Up @@ -139,10 +140,21 @@ void client_data_handler(z_owned_reply_t * reply, void * client_data);

///==============================================================================

struct rmw_service_data_t
class ZenohQuery final
{
std::size_t get_new_uid();
public:
ZenohQuery(const z_query_t * query);

~ZenohQuery();

const z_query_t get_query() const;

private:
z_owned_query_t query_;
};

struct rmw_service_data_t
{
z_owned_keyexpr_t keyexpr;
z_owned_queryable_t qable;

Expand All @@ -158,11 +170,11 @@ struct rmw_service_data_t
rmw_context_t * context;

// Deque to store the queries in the order they arrive.
std::deque<z_owned_query_t> query_queue;
std::deque<std::unique_ptr<ZenohQuery>> query_queue;
std::mutex query_queue_mutex;

// Map to store the sequence_number -> query_id
std::map<int64_t, z_owned_query_t> sequence_to_query_map;
std::unordered_map<int64_t, std::unique_ptr<ZenohQuery>> sequence_to_query_map;
std::mutex sequence_to_query_map_mutex;

std::mutex internal_mutex;
Expand All @@ -171,6 +183,19 @@ struct rmw_service_data_t

///==============================================================================

class ZenohReply final
{
public:
ZenohReply(const z_owned_reply_t * reply);

~ZenohReply();

std::optional<z_sample_t> get_sample() const;

private:
z_owned_reply_t reply_;
};

struct rmw_client_data_t
{
z_owned_keyexpr_t keyexpr;
Expand All @@ -179,8 +204,8 @@ struct rmw_client_data_t
// Liveliness token for the client.
zc_owned_liveliness_token_t token;

std::mutex message_mutex;
std::deque<z_owned_reply_t> replies;
std::mutex replies_mutex;
std::deque<std::unique_ptr<ZenohReply>> replies;

const void * request_type_support_impl;
const void * response_type_support_impl;
Expand All @@ -193,6 +218,10 @@ struct rmw_client_data_t
std::mutex internal_mutex;
std::condition_variable * condition{nullptr};

uint8_t client_guid[RMW_GID_STORAGE_SIZE];

size_t get_next_sequence_number();
std::mutex sequence_number_mutex;
size_t sequence_number{1};
};

Expand Down
7 changes: 0 additions & 7 deletions rmw_zenoh_cpp/src/detail/zenoh_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,12 @@ rmw_ret_t get_z_config(z_owned_config_t * config)
if (zenoh_config_path[0] != '\0') {
// If the environment variable is set, try to read the configuration from the file.
*config = zc_config_from_file(zenoh_config_path);
RCUTILS_LOG_INFO_NAMED(
"ZenohConfiguration",
"Using zenoh configuration file pointed by '%s' envar: '%s'", kZenohConfigFileEnvVar,
zenoh_config_path);
} else {
// If the environment variable is not set use internal configuration
static const std::string path_to_config_folder =
ament_index_cpp::get_package_share_directory(rmw_zenoh_identifier) + "/config/";
const std::string default_zconfig_path = path_to_config_folder + kDefaultZenohConfigFileName;
*config = zc_config_from_file(default_zconfig_path.c_str());
RCUTILS_LOG_INFO_NAMED(
"ZenohConfiguration",
"Using default zenoh configuration file at '%s'", default_zconfig_path.c_str());
}

// Verify that the configuration is valid.
Expand Down
10 changes: 1 addition & 9 deletions rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,12 @@ rmw_ret_t zenoh_router_check(z_session_t session)
// Define callback
auto callback = [](const struct z_id_t * id, void * ctx) {
const std::string id_str = zid_to_str(*id);
RCUTILS_LOG_INFO_NAMED(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized that there is a zid_to_str() method defined here and also within liveliness_utils.cpp. I'll ticket it to be fixed later.

"ZenohRouterCheck",
"A Zenoh router connected to the session with id '%s'", id_str.c_str());
// Note: Callback is guaranteed to never be called
// concurrently according to z_info_routers_zid docstring
(*(static_cast<int *>(ctx)))++;
};

rmw_ret_t ret;
rmw_ret_t ret = RMW_RET_OK;
z_owned_closure_zid_t router_callback = z_closure(callback, nullptr /* drop */, &context);
if (z_info_routers_zid(session, z_move(router_callback))) {
RCUTILS_LOG_ERROR_NAMED(
Expand All @@ -77,11 +74,6 @@ rmw_ret_t zenoh_router_check(z_session_t session)
"ZenohRouterCheck",
"No Zenoh router connected to the session");
ret = RMW_RET_ERROR;
} else {
RCUTILS_LOG_INFO_NAMED(
"ZenohRouterCheck",
"There are %d Zenoh routers connected to the session", context);
ret = RMW_RET_OK;
}
}

Expand Down
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/src/rmw_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "rmw/error_handling.h"
#include "rmw/event.h"
#include "rmw/types.h"

#include "detail/identifier.hpp"

Expand Down
Loading
Loading