Skip to content

Commit

Permalink
Switch to liveliness tokens (#67)
Browse files Browse the repository at this point in the history
* Switch to liveliness tokens

Signed-off-by: Yadunund <[email protected]>

* Use zc APIs instead of macros to resolve liveliness api issues

Signed-off-by: Yadunund <[email protected]>

---------

Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund authored Nov 16, 2023
1 parent f92dd56 commit 80ee67d
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 74 deletions.
23 changes: 0 additions & 23 deletions rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -259,27 +259,4 @@
},
},

/// Plugins configurations
/// Plugins are only loaded if present in the configuration. When starting
/// Once loaded, they may react to changes in the configuration made through the zenoh instance's adminspace.
plugins: {

/// Configure the storage manager plugin
storage_manager: {
/// Configure the storages supported by the volumes
storages: {
ros2_lv: {
/// Storages always need to know what set of keys they must work with. These sets are defined by a key expression.
key_expr: "@ros2_lv/**",
/// Storages also need to know which volume will be used to actually store their key-value pairs.
/// The "memory" volume is always available, and doesn't require any per-storage options, so requesting "memory" by string is always sufficient.
volume: "memory",
/// A complete storage advertises itself as containing all the known keys matching the configured key expression.
/// If not configured, complete defaults to false.
complete: "true",
},
},
},
},

}
15 changes: 10 additions & 5 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,10 @@ struct rmw_context_impl_s
///==============================================================================
struct rmw_node_data_t
{
// TODO(yadunund): Add a GraphCache object.

// Map topic name to topic types.
std::unordered_set<std::unordered_set<std::string>> publishers;
std::unordered_set<std::unordered_set<std::string>> subscriptions;
// TODO(Yadunund): Do we need a token at the node level? Right now I have one
// for cases where a node may spin up but does not have any publishers or subscriptions.
// Liveliness token for the node.
zc_owned_liveliness_token_t token;
};

///==============================================================================
Expand All @@ -71,6 +70,9 @@ struct rmw_publisher_data_t
// An owned publisher.
z_owned_publisher_t pub;

// Liveliness token for the publisher.
zc_owned_liveliness_token_t token;

// Type support fields
const void * type_support_impl;
const char * typesupport_identifier;
Expand Down Expand Up @@ -113,6 +115,9 @@ struct rmw_subscription_data_t
{
z_owned_subscriber_t sub;

// Liveliness token for the subscription.
zc_owned_liveliness_token_t token;

const void * type_support_impl;
const char * typesupport_identifier;
MessageTypeSupport * type_support;
Expand Down
33 changes: 23 additions & 10 deletions rmw_zenoh_cpp/src/rmw_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,18 +241,22 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
// Setup liveliness subscriptions for discovery.
const std::string liveliness_str = GenerateToken::liveliness(context->actual_domain_id);

// Query the router to get graph information before this session was started.
// TODO(Yadunund): This will not be needed once the zenoh-c liveliness API is available.
// Query the router/liveliness participants to get graph information before this session was started.
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"Sending Query '%s' to fetch discovery data from router...",
"Sending Query '%s' to fetch discovery data...",
liveliness_str.c_str()
);
z_owned_reply_channel_t channel = zc_reply_fifo_new(16);
z_get_options_t opts = z_get_options_default();
z_get(
z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), "", z_move(channel.send),
&opts); // here, the send is moved and will be dropped by zenoh when adequate
zc_liveliness_get(
z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()),
z_move(channel.send), NULL);
// Uncomment and rely on #if #endif blocks to enable this feature when building with
// zenoh-pico since liveliness is only available in zenoh-c.
// z_get_options_t opts = z_get_options_default();
// z_get(
// z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), "", z_move(channel.send),
// &opts); // here, the send is moved and will be dropped by zenoh when adequate
z_owned_reply_t reply = z_reply_null();
for (z_call(channel.recv, &reply); z_check(reply); z_call(channel.recv, &reply)) {
if (z_reply_is_ok(&reply)) {
Expand All @@ -277,14 +281,23 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
liveliness_str.c_str()
);

auto sub_options = z_subscriber_options_default();
sub_options.reliability = Z_RELIABILITY_RELIABLE;
// Uncomment and rely on #if #endif blocks to enable this feature when building with
// zenoh-pico since liveliness is only available in zenoh-c.
// auto sub_options = z_subscriber_options_default();
// sub_options.reliability = Z_RELIABILITY_RELIABLE;
// context->impl->graph_subscriber = z_declare_subscriber(
// z_loan(context->impl->session),
// z_keyexpr(liveliness_str.c_str()),
// z_move(callback),
// &sub_options);
auto sub_options = zc_liveliness_subscriber_options_null();
z_owned_closure_sample_t callback = z_closure(graph_sub_data_handler, nullptr, context->impl);
context->impl->graph_subscriber = z_declare_subscriber(
context->impl->graph_subscriber = zc_liveliness_declare_subscriber(
z_loan(context->impl->session),
z_keyexpr(liveliness_str.c_str()),
z_move(callback),
&sub_options);
zc_liveliness_subscriber_options_drop(z_move(sub_options));
auto undeclare_z_sub = rcpputils::make_scope_exit(
[context]() {
z_undeclare_subscriber(z_move(context->impl->graph_subscriber));
Expand Down
135 changes: 99 additions & 36 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <new>
#include <sstream>

#include <zenoh.h>

#include "detail/guard_condition.hpp"
#include "detail/graph_cache.hpp"
#include "detail/identifier.hpp"
Expand Down Expand Up @@ -180,19 +182,42 @@ rmw_create_node(
node->implementation_identifier = rmw_zenoh_identifier;
node->context = context;

// Uncomment and rely on #if #endif blocks to enable this feature when building with
// zenoh-pico since liveliness is only available in zenoh-c.
// Publish to the graph that a new node is in town
const bool pub_result = PublishToken::put(
&node->context->impl->session,
GenerateToken::node(context->actual_domain_id, namespace_, name)
// const bool pub_result = PublishToken::put(
// &node->context->impl->session,
// GenerateToken::node(context->actual_domain_id, namespace_, name)
// );
// if (!pub_result) {
// return nullptr;
// }
// Initialize liveliness token for the node to advertise that a new node is in town.
rmw_node_data_t * node_data = static_cast<rmw_node_data_t *>(node->data);
node_data->token = zc_liveliness_declare_token(
z_loan(node->context->impl->session),
z_keyexpr(GenerateToken::node(context->actual_domain_id, namespace_, name).c_str()),
NULL
);
if (!pub_result) {
auto free_token = rcpputils::make_scope_exit(
[node]() {
if (node->data != nullptr) {
rmw_node_data_t * node_data = static_cast<rmw_node_data_t *>(node->data);
z_drop(z_move(node_data->token));
}
});
if (!zc_liveliness_token_check(&node_data->token)) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to create liveliness token for the node.");
return nullptr;
}

free_node_data.cancel();
free_namespace.cancel();
free_name.cancel();
free_node.cancel();
free_token.cancel();
return node;
}

Expand All @@ -204,20 +229,27 @@ rmw_destroy_node(rmw_node_t * node)
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(node->data, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
node,
node->implementation_identifier,
rmw_zenoh_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);

// Uncomment and rely on #if #endif blocks to enable this feature when building with
// zenoh-pico since liveliness is only available in zenoh-c.
// Publish to the graph that a node has ridden off into the sunset
const bool del_result = PublishToken::del(
&node->context->impl->session,
GenerateToken::node(node->context->actual_domain_id, node->namespace_, node->name)
);
if (!del_result) {
return RMW_RET_ERROR;
}
// const bool del_result = PublishToken::del(
// &node->context->impl->session,
// GenerateToken::node(node->context->actual_domain_id, node->namespace_, node->name)
// );
// if (!del_result) {
// return RMW_RET_ERROR;
// }

// Undeclare liveliness token for the node to advertise that the node has ridden off into the sunset.
rmw_node_data_t * node_data = static_cast<rmw_node_data_t *>(node->data);
zc_liveliness_undeclare_token(z_move(node_data->token));

rcutils_allocator_t * allocator = &node->context->options.allocator;

Expand Down Expand Up @@ -516,19 +548,45 @@ rmw_create_publisher(
z_undeclare_publisher(z_move(publisher_data->pub));
});

// Uncomment and rely on #if #endif blocks to enable this feature when building with
// zenoh-pico since liveliness is only available in zenoh-c.
// Publish to the graph that a new publisher is in town
// TODO(Yadunund): Publish liveliness for the new publisher.
const bool pub_result = PublishToken::put(
&node->context->impl->session,
GenerateToken::publisher(
node->context->actual_domain_id,
node->namespace_,
node->name,
rmw_publisher->topic_name,
publisher_data->type_support->get_name(),
"reliable")
// const bool pub_result = PublishToken::put(
// &node->context->impl->session,
// GenerateToken::publisher(
// node->context->actual_domain_id,
// node->namespace_,
// node->name,
// rmw_publisher->topic_name,
// publisher_data->type_support->get_name(),
// "reliable")
// );
// if (!pub_result) {
// return nullptr;
// }
publisher_data->token = zc_liveliness_declare_token(
z_loan(node->context->impl->session),
z_keyexpr(
GenerateToken::publisher(
node->context->actual_domain_id,
node->namespace_,
node->name,
rmw_publisher->topic_name,
publisher_data->type_support->get_name(),
"reliable").c_str()),
NULL
);
if (!pub_result) {
auto free_token = rcpputils::make_scope_exit(
[publisher_data]() {
if (publisher_data != nullptr) {
z_drop(z_move(publisher_data->token));
}
});
if (!zc_liveliness_token_check(&publisher_data->token)) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to create liveliness token for the publisher.");
return nullptr;
}

Expand All @@ -540,6 +598,7 @@ rmw_create_publisher(
node->context->impl->graph_cache.remove_publisher(publisher_data->graph_cache_handle);
});

free_token.cancel();
remove_from_graph_cache.cancel();
undeclare_z_publisher.cancel();
free_topic_name.cancel();
Expand Down Expand Up @@ -575,22 +634,26 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher)

auto publisher_data = static_cast<rmw_publisher_data_t *>(publisher->data);
if (publisher_data != nullptr) {
// Uncomment and rely on #if #endif blocks to enable this feature when building with
// zenoh-pico since liveliness is only available in zenoh-c.
// Publish to the graph that a publisher has ridden off into the sunset
const bool del_result = PublishToken::del(
&node->context->impl->session,
GenerateToken::publisher(
node->context->actual_domain_id,
node->namespace_,
node->name,
publisher->topic_name,
publisher_data->type_support->get_name(),
"reliable"
)
);
if (!del_result) {
// TODO(Yadunund): Should this really return an error?
return RMW_RET_ERROR;
}
// const bool del_result = PublishToken::del(
// &node->context->impl->session,
// GenerateToken::publisher(
// node->context->actual_domain_id,
// node->namespace_,
// node->name,
// publisher->topic_name,
// publisher_data->type_support->get_name(),
// "reliable"
// )
// );
// if (!del_result) {
// // TODO(Yadunund): Should this really return an error?
// return RMW_RET_ERROR;
// }
zc_liveliness_undeclare_token(z_move(publisher_data->token));

node->context->impl->graph_cache.remove_publisher(publisher_data->graph_cache_handle);

RMW_TRY_DESTRUCTOR(publisher_data->type_support->~MessageTypeSupport(), MessageTypeSupport, );
Expand Down

0 comments on commit 80ee67d

Please sign in to comment.