Skip to content

Commit

Permalink
Improve service and subscription reliability (#92)
Browse files Browse the repository at this point in the history
* Switch to unique_ptr for the query_queue.

This is not strictly required, but it does make ownership
clearer.

Signed-off-by: Chris Lalancette <[email protected]>

* Fix cpplint issues.

Signed-off-by: Chris Lalancette <[email protected]>

* Use unique_ptr for replies.

Signed-off-by: Chris Lalancette <[email protected]>

* Use custom classes to wrap the queries and replies.

This allows us to do RAII and drop things in all paths
properly.

Signed-off-by: Chris Lalancette <[email protected]>

* Revamp rmw_shutdown and rmw_context_fini.

rmw_context_fini is called during the atexit handler,
so we can't do complex things like shutdown the session.
Instead, switch to shutting down the session during
rmw_shutdown.

Signed-off-by: Chris Lalancette <[email protected]>

* Make sure to check z_reply_is_ok.

Signed-off-by: Chris Lalancette <[email protected]>

* Add in header includes as needed.

Signed-off-by: Chris Lalancette <[email protected]>

* Remove a lot of debugging statements.

Signed-off-by: Chris Lalancette <[email protected]>

* Add in source timestamps to service requests.

Signed-off-by: Chris Lalancette <[email protected]>

* Add in source_timestamp for services.

Signed-off-by: Chris Lalancette <[email protected]>

* Run the constructor for rmw_publisher_data_t.

Signed-off-by: Chris Lalancette <[email protected]>

* Don't pass unnecessary argument into __rmw_take.

Signed-off-by: Chris Lalancette <[email protected]>

* Fix up memory leaks in clients, services, and the context.

Signed-off-by: Chris Lalancette <[email protected]>

* Fill in the received_timetstamp.

Signed-off-by: Chris Lalancette <[email protected]>

* Add in a writer_guid to the client and service requests.

Signed-off-by: Chris Lalancette <[email protected]>

* Revamp rmw_wait.

There are more comments in the code, but in particular this
change makes sure we deal with data that is already ready,
rather than always waiting.  With this in place, services
seem to work as promised.

Signed-off-by: Chris Lalancette <[email protected]>

* Switch which end of the deque subscriptions push and pop from.

Since it is a deque, it doesn't *really* matter whether
we push from the back and pull from the front, or push
from the front and pull from the back.  Switch to pushing
to the back and pulling from the front so we match what
the clients and services are doing.

Signed-off-by: Chris Lalancette <[email protected]>

* Remove unnecessary configuration file.

Signed-off-by: Chris Lalancette <[email protected]>

---------

Signed-off-by: Chris Lalancette <[email protected]>
  • Loading branch information
clalancette authored Jan 19, 2024
1 parent 9823c65 commit 0e7a380
Show file tree
Hide file tree
Showing 17 changed files with 485 additions and 309 deletions.
18 changes: 0 additions & 18 deletions rmw_zenoh_config.json5

This file was deleted.

44 changes: 5 additions & 39 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ void GraphCache::parse_put(const std::string & keyexpr)
return graph_node.clients_;
}
}(entity, graph_node, entity_desc);
// For the sake of reusing data structures and lookup functions, we treat publishers and clients are equivalent.
// Similarly, subscriptions and services are equivalent.
// For the sake of reusing data structures and lookup functions, we treat publishers and
// clients are equivalent. Similarly, subscriptions and services are equivalent.
const std::size_t pub_count = entity.type() == EntityType::Publisher ||
entity.type() == EntityType::Client ? 1 : 0;
const std::size_t sub_count = !pub_count;
Expand Down Expand Up @@ -172,15 +172,6 @@ void GraphCache::parse_put(const std::string & keyexpr)
topic_data_insertion.first->second->stats_.sub_count_ += sub_count;
}
}

RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"Added %s on topic %s with type %s and qos %s to node /%s.",
entity_desc.c_str(),
topic_info.name_.c_str(),
topic_info.type_.c_str(),
topic_info.qos_.c_str(),
graph_node.name_.c_str());
};

// Helper lambda to convert an Entity into a GraphNode.
Expand Down Expand Up @@ -213,10 +204,6 @@ void GraphCache::parse_put(const std::string & keyexpr)
NodeMap node_map = {
{entity.node_name(), make_graph_node(entity, *this)}};
graph_.emplace(std::make_pair(entity.node_namespace(), std::move(node_map)));
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp", "Added node /%s to a new namespace %s in the graph.",
entity.node_name().c_str(),
entity.node_namespace().c_str());
return;
}

Expand All @@ -238,14 +225,7 @@ void GraphCache::parse_put(const std::string & keyexpr)
// name but unique id.
NodeMap::iterator insertion_it =
ns_it->second.insert(std::make_pair(entity.node_name(), make_graph_node(entity, *this)));
if (insertion_it != ns_it->second.end()) {
RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"Added a new node /%s with id %s to an existing namespace %s in the graph.",
entity.node_name().c_str(),
entity.id().c_str(),
entity.node_namespace().c_str());
} else {
if (insertion_it == ns_it->second.end()) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to add a new node /%s with id %s an "
Expand Down Expand Up @@ -369,8 +349,8 @@ void GraphCache::parse_del(const std::string & keyexpr)
return graph_node.clients_;
}
}(entity, graph_node, entity_desc);
// For the sake of reusing data structures and lookup functions, we treat publishers and clients are equivalent.
// Similarly, subscriptions and services are equivalent.
// For the sake of reusing data structures and lookup functions, we treat publishers and
// clients are equivalent. Similarly, subscriptions and services are equivalent.
const std::size_t pub_count = entity.type() == EntityType::Publisher ||
entity.type() == EntityType::Client ? 1 : 0;
const std::size_t sub_count = !pub_count;
Expand Down Expand Up @@ -409,15 +389,6 @@ void GraphCache::parse_del(const std::string & keyexpr)

// Bookkeeping: Update graph_topic_ which keeps track of topics across all nodes in the graph.
update_graph_topics(topic_info, entity.type(), pub_count, sub_count, graph_cache);

RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"Removed %s on topic %s with type %s and qos %s to node /%s.",
entity_desc.c_str(),
topic_info.name_.c_str(),
topic_info.type_.c_str(),
topic_info.qos_.c_str(),
graph_node.name_.c_str());
};

// Lock the graph mutex before accessing the graph.
Expand Down Expand Up @@ -484,11 +455,6 @@ void GraphCache::parse_del(const std::string & keyexpr)
remove_topics(graph_node->clients_, EntityType::Client, *this);
}
ns_it->second.erase(node_it);
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"Removed node /%s from the graph.",
entity.node_name().c_str()
);
return;
}

Expand Down
2 changes: 0 additions & 2 deletions rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ struct TopicData
using TopicDataPtr = std::shared_ptr<TopicData>;

///=============================================================================
// TODO(Yadunund): Expand to services and clients.
struct GraphNode
{
std::string id_;
Expand All @@ -82,7 +81,6 @@ struct GraphNode
// Entires for service/client.
TopicMap clients_ = {};
TopicMap services_ = {};

};
using GraphNodePtr = std::shared_ptr<GraphNode>;

Expand Down
4 changes: 2 additions & 2 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ bool PublishToken::put(
RCUTILS_SET_ERROR_MSG("invalid keyexpression generation for liveliness publication.");
return false;
}
RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "Sending PUT on %s", token.c_str());

z_put_options_t options = z_put_options_default();
options.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL);
if (z_put(z_loan(*session), z_keyexpr(token.c_str()), nullptr, 0, &options) < 0) {
Expand Down Expand Up @@ -379,7 +379,7 @@ bool PublishToken::del(
RCUTILS_SET_ERROR_MSG("invalid key-expression generation for liveliness publication.");
return false;
}
RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "Sending DELETE on %s", token.c_str());

const z_delete_options_t options = z_delete_options_default();
if (z_delete(z_loan(*session), z_loan(keyexpr), &options) < 0) {
RCUTILS_SET_ERROR_MSG("failed to delete liveliness key");
Expand Down
58 changes: 48 additions & 10 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

#include <zenoh.h>

#include <cstring>
#include <mutex>
#include <optional>
#include <utility>

#include "rcpputils/scope_exit.hpp"
Expand Down Expand Up @@ -63,12 +65,12 @@ void sub_data_handler(
sub_data->queue_depth,
z_loan(keystr));

std::unique_ptr<saved_msg_data> old = std::move(sub_data->message_queue.back());
std::unique_ptr<saved_msg_data> old = std::move(sub_data->message_queue.front());
z_drop(&old->payload);
sub_data->message_queue.pop_back();
sub_data->message_queue.pop_front();
}

sub_data->message_queue.emplace_front(
sub_data->message_queue.emplace_back(
std::make_unique<saved_msg_data>(
zc_sample_payload_rcinc(sample),
sample->timestamp.time, sample->timestamp.id.id));
Expand All @@ -81,13 +83,24 @@ void sub_data_handler(
}
}

ZenohQuery::ZenohQuery(const z_query_t * query)
{
query_ = z_query_clone(query);
}

ZenohQuery::~ZenohQuery()
{
z_drop(z_move(query_));
}

const z_query_t ZenohQuery::get_query() const
{
return z_query_loan(&query_);
}

//==============================================================================
void service_data_handler(const z_query_t * query, void * data)
{
RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"[service_data_handler] triggered"
);
z_owned_str_t keystr = z_keyexpr_to_string(z_query_keyexpr(query));
auto drop_keystr = rcpputils::make_scope_exit(
[&keystr]() {
Expand All @@ -108,7 +121,7 @@ void service_data_handler(const z_query_t * query, void * data)
// Get the query parameters and payload
{
std::lock_guard<std::mutex> lock(service_data->query_queue_mutex);
service_data->query_queue.push_back(z_query_clone(query));
service_data->query_queue.emplace_back(std::make_unique<ZenohQuery>(query));
}
{
// Since we added new data, trigger the guard condition if it is available
Expand All @@ -119,6 +132,31 @@ void service_data_handler(const z_query_t * query, void * data)
}
}

ZenohReply::ZenohReply(const z_owned_reply_t * reply)
{
reply_ = *reply;
}

ZenohReply::~ZenohReply()
{
z_reply_drop(z_move(reply_));
}

std::optional<z_sample_t> ZenohReply::get_sample() const
{
if (z_reply_is_ok(&reply_)) {
return z_reply_ok(&reply_);
}

return std::nullopt;
}

size_t rmw_client_data_t::get_next_sequence_number()
{
std::lock_guard<std::mutex> lock(sequence_number_mutex);
return sequence_number++;
}

//==============================================================================
void client_data_handler(z_owned_reply_t * reply, void * data)
{
Expand All @@ -145,9 +183,9 @@ void client_data_handler(z_owned_reply_t * reply, void * data)
return;
}
{
std::lock_guard<std::mutex> msg_lock(client_data->message_mutex);
std::lock_guard<std::mutex> msg_lock(client_data->replies_mutex);
// Take ownership of the reply.
client_data->replies.emplace_back(*reply);
client_data->replies.emplace_back(std::make_unique<ZenohReply>(reply));
*reply = z_reply_null();
}
{
Expand Down
41 changes: 35 additions & 6 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <deque>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <unordered_map>
#include <unordered_set>
Expand Down Expand Up @@ -139,10 +140,21 @@ void client_data_handler(z_owned_reply_t * reply, void * client_data);

///==============================================================================

struct rmw_service_data_t
class ZenohQuery final
{
std::size_t get_new_uid();
public:
ZenohQuery(const z_query_t * query);

~ZenohQuery();

const z_query_t get_query() const;

private:
z_owned_query_t query_;
};

struct rmw_service_data_t
{
z_owned_keyexpr_t keyexpr;
z_owned_queryable_t qable;

Expand All @@ -158,11 +170,11 @@ struct rmw_service_data_t
rmw_context_t * context;

// Deque to store the queries in the order they arrive.
std::deque<z_owned_query_t> query_queue;
std::deque<std::unique_ptr<ZenohQuery>> query_queue;
std::mutex query_queue_mutex;

// Map to store the sequence_number -> query_id
std::map<int64_t, z_owned_query_t> sequence_to_query_map;
std::unordered_map<int64_t, std::unique_ptr<ZenohQuery>> sequence_to_query_map;
std::mutex sequence_to_query_map_mutex;

std::mutex internal_mutex;
Expand All @@ -171,6 +183,19 @@ struct rmw_service_data_t

///==============================================================================

class ZenohReply final
{
public:
ZenohReply(const z_owned_reply_t * reply);

~ZenohReply();

std::optional<z_sample_t> get_sample() const;

private:
z_owned_reply_t reply_;
};

struct rmw_client_data_t
{
z_owned_keyexpr_t keyexpr;
Expand All @@ -179,8 +204,8 @@ struct rmw_client_data_t
// Liveliness token for the client.
zc_owned_liveliness_token_t token;

std::mutex message_mutex;
std::deque<z_owned_reply_t> replies;
std::mutex replies_mutex;
std::deque<std::unique_ptr<ZenohReply>> replies;

const void * request_type_support_impl;
const void * response_type_support_impl;
Expand All @@ -193,6 +218,10 @@ struct rmw_client_data_t
std::mutex internal_mutex;
std::condition_variable * condition{nullptr};

uint8_t client_guid[RMW_GID_STORAGE_SIZE];

size_t get_next_sequence_number();
std::mutex sequence_number_mutex;
size_t sequence_number{1};
};

Expand Down
7 changes: 0 additions & 7 deletions rmw_zenoh_cpp/src/detail/zenoh_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,12 @@ rmw_ret_t get_z_config(z_owned_config_t * config)
if (zenoh_config_path[0] != '\0') {
// If the environment variable is set, try to read the configuration from the file.
*config = zc_config_from_file(zenoh_config_path);
RCUTILS_LOG_INFO_NAMED(
"ZenohConfiguration",
"Using zenoh configuration file pointed by '%s' envar: '%s'", kZenohConfigFileEnvVar,
zenoh_config_path);
} else {
// If the environment variable is not set use internal configuration
static const std::string path_to_config_folder =
ament_index_cpp::get_package_share_directory(rmw_zenoh_identifier) + "/config/";
const std::string default_zconfig_path = path_to_config_folder + kDefaultZenohConfigFileName;
*config = zc_config_from_file(default_zconfig_path.c_str());
RCUTILS_LOG_INFO_NAMED(
"ZenohConfiguration",
"Using default zenoh configuration file at '%s'", default_zconfig_path.c_str());
}

// Verify that the configuration is valid.
Expand Down
10 changes: 1 addition & 9 deletions rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,12 @@ rmw_ret_t zenoh_router_check(z_session_t session)
// Define callback
auto callback = [](const struct z_id_t * id, void * ctx) {
const std::string id_str = zid_to_str(*id);
RCUTILS_LOG_INFO_NAMED(
"ZenohRouterCheck",
"A Zenoh router connected to the session with id '%s'", id_str.c_str());
// Note: Callback is guaranteed to never be called
// concurrently according to z_info_routers_zid docstring
(*(static_cast<int *>(ctx)))++;
};

rmw_ret_t ret;
rmw_ret_t ret = RMW_RET_OK;
z_owned_closure_zid_t router_callback = z_closure(callback, nullptr /* drop */, &context);
if (z_info_routers_zid(session, z_move(router_callback))) {
RCUTILS_LOG_ERROR_NAMED(
Expand All @@ -77,11 +74,6 @@ rmw_ret_t zenoh_router_check(z_session_t session)
"ZenohRouterCheck",
"No Zenoh router connected to the session");
ret = RMW_RET_ERROR;
} else {
RCUTILS_LOG_INFO_NAMED(
"ZenohRouterCheck",
"There are %d Zenoh routers connected to the session", context);
ret = RMW_RET_OK;
}
}

Expand Down
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/src/rmw_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "rmw/error_handling.h"
#include "rmw/event.h"
#include "rmw/types.h"

#include "detail/identifier.hpp"

Expand Down
Loading

0 comments on commit 0e7a380

Please sign in to comment.