Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to liveliness tokens #67

Merged
merged 2 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -259,27 +259,4 @@
},
},

/// Plugins configurations
/// Plugins are only loaded if present in the configuration. When starting
/// Once loaded, they may react to changes in the configuration made through the zenoh instance's adminspace.
plugins: {

/// Configure the storage manager plugin
storage_manager: {
/// Configure the storages supported by the volumes
storages: {
ros2_lv: {
/// Storages always need to know what set of keys they must work with. These sets are defined by a key expression.
key_expr: "@ros2_lv/**",
/// Storages also need to know which volume will be used to actually store their key-value pairs.
/// The "memory" volume is always available, and doesn't require any per-storage options, so requesting "memory" by string is always sufficient.
volume: "memory",
/// A complete storage advertises itself as containing all the known keys matching the configured key expression.
/// If not configured, complete defaults to false.
complete: "true",
},
},
},
},

}
15 changes: 10 additions & 5 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,10 @@ struct rmw_context_impl_s
///==============================================================================
struct rmw_node_data_t
{
// TODO(yadunund): Add a GraphCache object.

// Map topic name to topic types.
std::unordered_set<std::unordered_set<std::string>> publishers;
std::unordered_set<std::unordered_set<std::string>> subscriptions;
// TODO(Yadunund): Do we need a token at the node level? Right now I have one
// for cases where a node may spin up but does not have any publishers or subscriptions.
// Liveliness token for the node.
zc_owned_liveliness_token_t token;
};

///==============================================================================
Expand All @@ -71,6 +70,9 @@ struct rmw_publisher_data_t
// An owned publisher.
z_owned_publisher_t pub;

// Liveliness token for the publisher.
zc_owned_liveliness_token_t token;

// Type support fields
const void * type_support_impl;
const char * typesupport_identifier;
Expand Down Expand Up @@ -113,6 +115,9 @@ struct rmw_subscription_data_t
{
z_owned_subscriber_t sub;

// Liveliness token for the subscription.
zc_owned_liveliness_token_t token;

const void * type_support_impl;
const char * typesupport_identifier;
MessageTypeSupport * type_support;
Expand Down
33 changes: 23 additions & 10 deletions rmw_zenoh_cpp/src/rmw_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,18 +241,22 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
// Setup liveliness subscriptions for discovery.
const std::string liveliness_str = GenerateToken::liveliness(context->actual_domain_id);

// Query the router to get graph information before this session was started.
// TODO(Yadunund): This will not be needed once the zenoh-c liveliness API is available.
// Query the router/liveliness participants to get graph information before this session was started.
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"Sending Query '%s' to fetch discovery data from router...",
"Sending Query '%s' to fetch discovery data...",
liveliness_str.c_str()
);
z_owned_reply_channel_t channel = zc_reply_fifo_new(16);
z_get_options_t opts = z_get_options_default();
z_get(
z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), "", z_move(channel.send),
&opts); // here, the send is moved and will be dropped by zenoh when adequate
zc_liveliness_get(
z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()),
z_move(channel.send), NULL);
// Uncomment and rely on #if #endif blocks to enable this feature when building with
// zenoh-pico since liveliness is only available in zenoh-c.
// z_get_options_t opts = z_get_options_default();
// z_get(
// z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), "", z_move(channel.send),
// &opts); // here, the send is moved and will be dropped by zenoh when adequate
z_owned_reply_t reply = z_reply_null();
for (z_call(channel.recv, &reply); z_check(reply); z_call(channel.recv, &reply)) {
if (z_reply_is_ok(&reply)) {
Expand All @@ -277,14 +281,23 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
liveliness_str.c_str()
);

auto sub_options = z_subscriber_options_default();
sub_options.reliability = Z_RELIABILITY_RELIABLE;
// Uncomment and rely on #if #endif blocks to enable this feature when building with
// zenoh-pico since liveliness is only available in zenoh-c.
// auto sub_options = z_subscriber_options_default();
// sub_options.reliability = Z_RELIABILITY_RELIABLE;
// context->impl->graph_subscriber = z_declare_subscriber(
// z_loan(context->impl->session),
// z_keyexpr(liveliness_str.c_str()),
// z_move(callback),
// &sub_options);
auto sub_options = zc_liveliness_subscriber_options_null();
z_owned_closure_sample_t callback = z_closure(graph_sub_data_handler, nullptr, context->impl);
context->impl->graph_subscriber = z_declare_subscriber(
context->impl->graph_subscriber = zc_liveliness_declare_subscriber(
z_loan(context->impl->session),
z_keyexpr(liveliness_str.c_str()),
z_move(callback),
&sub_options);
zc_liveliness_subscriber_options_drop(z_move(sub_options));
auto undeclare_z_sub = rcpputils::make_scope_exit(
[context]() {
z_undeclare_subscriber(z_move(context->impl->graph_subscriber));
Expand Down
135 changes: 99 additions & 36 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <new>
#include <sstream>

#include <zenoh.h>

#include "detail/guard_condition.hpp"
#include "detail/graph_cache.hpp"
#include "detail/identifier.hpp"
Expand Down Expand Up @@ -180,19 +182,42 @@ rmw_create_node(
node->implementation_identifier = rmw_zenoh_identifier;
node->context = context;

// Uncomment and rely on #if #endif blocks to enable this feature when building with
// zenoh-pico since liveliness is only available in zenoh-c.
// Publish to the graph that a new node is in town
const bool pub_result = PublishToken::put(
&node->context->impl->session,
GenerateToken::node(context->actual_domain_id, namespace_, name)
// const bool pub_result = PublishToken::put(
// &node->context->impl->session,
// GenerateToken::node(context->actual_domain_id, namespace_, name)
// );
// if (!pub_result) {
// return nullptr;
// }
// Initialize liveliness token for the node to advertise that a new node is in town.
rmw_node_data_t * node_data = static_cast<rmw_node_data_t *>(node->data);
node_data->token = zc_liveliness_declare_token(
z_loan(node->context->impl->session),
z_keyexpr(GenerateToken::node(context->actual_domain_id, namespace_, name).c_str()),
NULL
);
if (!pub_result) {
auto free_token = rcpputils::make_scope_exit(
[node]() {
if (node->data != nullptr) {
rmw_node_data_t * node_data = static_cast<rmw_node_data_t *>(node->data);
z_drop(z_move(node_data->token));
}
});
if (!zc_liveliness_token_check(&node_data->token)) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to create liveliness token for the node.");
return nullptr;
}

free_node_data.cancel();
free_namespace.cancel();
free_name.cancel();
free_node.cancel();
free_token.cancel();
return node;
}

Expand All @@ -204,20 +229,27 @@ rmw_destroy_node(rmw_node_t * node)
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(node->data, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
node,
node->implementation_identifier,
rmw_zenoh_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);

// Uncomment and rely on #if #endif blocks to enable this feature when building with
// zenoh-pico since liveliness is only available in zenoh-c.
// Publish to the graph that a node has ridden off into the sunset
const bool del_result = PublishToken::del(
&node->context->impl->session,
GenerateToken::node(node->context->actual_domain_id, node->namespace_, node->name)
);
if (!del_result) {
return RMW_RET_ERROR;
}
// const bool del_result = PublishToken::del(
// &node->context->impl->session,
// GenerateToken::node(node->context->actual_domain_id, node->namespace_, node->name)
// );
// if (!del_result) {
// return RMW_RET_ERROR;
// }

// Undeclare liveliness token for the node to advertise that the node has ridden off into the sunset.
rmw_node_data_t * node_data = static_cast<rmw_node_data_t *>(node->data);
zc_liveliness_undeclare_token(z_move(node_data->token));

rcutils_allocator_t * allocator = &node->context->options.allocator;

Expand Down Expand Up @@ -516,19 +548,45 @@ rmw_create_publisher(
z_undeclare_publisher(z_move(publisher_data->pub));
});

// Uncomment and rely on #if #endif blocks to enable this feature when building with
// zenoh-pico since liveliness is only available in zenoh-c.
// Publish to the graph that a new publisher is in town
// TODO(Yadunund): Publish liveliness for the new publisher.
const bool pub_result = PublishToken::put(
&node->context->impl->session,
GenerateToken::publisher(
node->context->actual_domain_id,
node->namespace_,
node->name,
rmw_publisher->topic_name,
publisher_data->type_support->get_name(),
"reliable")
// const bool pub_result = PublishToken::put(
// &node->context->impl->session,
// GenerateToken::publisher(
// node->context->actual_domain_id,
// node->namespace_,
// node->name,
// rmw_publisher->topic_name,
// publisher_data->type_support->get_name(),
// "reliable")
// );
// if (!pub_result) {
// return nullptr;
// }
publisher_data->token = zc_liveliness_declare_token(
z_loan(node->context->impl->session),
z_keyexpr(
GenerateToken::publisher(
node->context->actual_domain_id,
node->namespace_,
node->name,
rmw_publisher->topic_name,
publisher_data->type_support->get_name(),
"reliable").c_str()),
NULL
);
if (!pub_result) {
auto free_token = rcpputils::make_scope_exit(
[publisher_data]() {
if (publisher_data != nullptr) {
z_drop(z_move(publisher_data->token));
}
});
if (!zc_liveliness_token_check(&publisher_data->token)) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to create liveliness token for the publisher.");
return nullptr;
}

Expand All @@ -540,6 +598,7 @@ rmw_create_publisher(
node->context->impl->graph_cache.remove_publisher(publisher_data->graph_cache_handle);
});

free_token.cancel();
remove_from_graph_cache.cancel();
undeclare_z_publisher.cancel();
free_topic_name.cancel();
Expand Down Expand Up @@ -575,22 +634,26 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher)

auto publisher_data = static_cast<rmw_publisher_data_t *>(publisher->data);
if (publisher_data != nullptr) {
// Uncomment and rely on #if #endif blocks to enable this feature when building with
// zenoh-pico since liveliness is only available in zenoh-c.
// Publish to the graph that a publisher has ridden off into the sunset
const bool del_result = PublishToken::del(
&node->context->impl->session,
GenerateToken::publisher(
node->context->actual_domain_id,
node->namespace_,
node->name,
publisher->topic_name,
publisher_data->type_support->get_name(),
"reliable"
)
);
if (!del_result) {
// TODO(Yadunund): Should this really return an error?
return RMW_RET_ERROR;
}
// const bool del_result = PublishToken::del(
// &node->context->impl->session,
// GenerateToken::publisher(
// node->context->actual_domain_id,
// node->namespace_,
// node->name,
// publisher->topic_name,
// publisher_data->type_support->get_name(),
// "reliable"
// )
// );
// if (!del_result) {
// // TODO(Yadunund): Should this really return an error?
// return RMW_RET_ERROR;
// }
zc_liveliness_undeclare_token(z_move(publisher_data->token));

node->context->impl->graph_cache.remove_publisher(publisher_data->graph_cache_handle);

RMW_TRY_DESTRUCTOR(publisher_data->type_support->~MessageTypeSupport(), MessageTypeSupport, );
Expand Down
Loading