Skip to content

Commit

Permalink
Merge branch 'rolling' into yadu/bump_zenoh_with_tokio
Browse files Browse the repository at this point in the history
  • Loading branch information
Yadunund committed Apr 5, 2024
2 parents 063a850 + 87f20f8 commit 59ffcdd
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 30 deletions.
33 changes: 24 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,28 +68,43 @@ ros2 run demo_nodes_cpp listener
The listener node should start receiving messages over the `/chatter` topic.

## Configuration
`rmw_zenoh` relies on separate configurations files to configure the Zenoh `router` and `session` respectively.
To understand more about `routers` and `sessions`, see [Zenoh documentation](https://zenoh.io/docs/getting-started/deployment/).

By default, `Zenoh sessions` created by `rmw_zenoh` will attempt to connect to a Zenoh router to receive discovery information.
To understand more about `Zenoh routers` and `Zenoh sessions`, see [Zenoh documentation](https://zenoh.io/docs/getting-started/deployment/).

### Checking for a Zenoh router.
The `ZENOH_ROUTER_CHECK_ATTEMPTS` environment variable can be used to configure if and how a `Zenoh session` checks for the presence of a `Zenoh router`.
The behavior is explained in the table below.


| ZENOH_ROUTER_CHECK_ATTEMPTS | Session behavior |
|:---------------------------:|:----------------------------------------------------------------------------------------------------------------:|
| unset or 0 | Indefinitely waits for connection to a Zenoh router. |
| < 0 | Skips Zenoh router check. |
| > 0 | Attempts to connect to a Zenoh router in `ZENOH_ROUTER_CHECK_ATTEMPTS` attempts with 1 second wait between checks. |

### Session and Router configs
`rmw_zenoh` relies on separate configurations files to configure the `Zenoh router` and `Zenoh session` respectively.
For more information on the topology of Zenoh adopted in `rmw_zenoh`, please see [Design](#design).
Default configuration files are used by `rmw_zenoh` however certain environment variables may be set to provide absolute paths to custom configuration files.
The table below summarizes the default files and the environment variables for the `router` and `session`.
The table below summarizes the default files and the environment variables for the `Zenoh router` and `Zenoh session`.
For a complete list of configurable parameters, see [zenoh/DEFAULT_CONFIG.json5](https://github.com/eclipse-zenoh/zenoh/blob/main/DEFAULT_CONFIG.json5).

| | Default config | Envar for custom config |
|---------|:----------------------------------------------------------------------------------------------------:|:--------------------------:|
| Router | [DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5](rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5) | `ZENOH_ROUTER_CONFIG_URI` |
| Session | [DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5](rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5) | `ZENOH_SESSION_CONFIG_URI` |

For example, to set the path to a custom `router` configuration file,
For example, to set the path to a custom `Zenoh router` configuration file,
```bash
export ZENOH_ROUTER_CONFIG_URI=$HOME/MY_ZENOH_ROUTER_CONFIG.json5
```

### Connecting multiple hosts
By default, all discovery traffic is local per host, where the host is the PC running a Zenoh `router`.
To bridge communications across two hosts, the `router` configuration for one the hosts must be updated to connect to the other `router` at startup.
This is done by specifying an endpoint in host's `router` configuration file to as seen below.
In this example, the `router` will connect to the `router` running on a second host with IP address `192.168.1.1` and port `7447`.
By default, all discovery traffic is local per host, where the host is the PC running a `Zenoh router`.
To bridge communications across two hosts, the `Zenoh router` configuration for one the hosts must be updated to connect to the other `Zenoh router` at startup.
This is done by specifying an endpoint in host's `Zenoh router` configuration file to as seen below.
In this example, the `Zenoh router` will connect to the `Zenoh router` running on a second host with IP address `192.168.1.1` and port `7447`.

```json
{
Expand All @@ -99,4 +114,4 @@ In this example, the `router` will connect to the `router` running on a second h
}
```

> Note: To connect multiple hosts, include the endpoints of all routers in the network.
> Note: To connect multiple hosts, include the endpoints of all `Zenoh routers` in the network.
3 changes: 3 additions & 0 deletions rmw_zenoh_cpp/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
<buildtool_depend>ament_cmake</buildtool_depend>

<build_depend>zenoh_c_vendor</build_depend>
<build_export_depend>zenoh_c_vendor</build_export_depend>

<depend>ament_index_cpp</depend>
<depend>fastcdr</depend>
Expand All @@ -23,6 +24,8 @@
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>

<member_of_group>rmw_implementation_packages</member_of_group>

<export>
<build_type>ament_cmake</build_type>
</export>
Expand Down
34 changes: 34 additions & 0 deletions rmw_zenoh_cpp/src/detail/zenoh_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <rcutils/env.h>
#include <rcutils/logging_macros.h>

#include <limits>
#include <string>

#include <ament_index_cpp/get_package_share_directory.hpp>
Expand All @@ -35,6 +36,8 @@ static const std::unordered_map<ConfigurableEntity,
{ConfigurableEntity::Router, {"ZENOH_ROUTER_CONFIG_URI", "DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5"}}
};

static const char * router_check_attempts_envar = "ZENOH_ROUTER_CHECK_ATTEMPTS";

rmw_ret_t _get_z_config(
const char * envar_name,
const char * default_uri,
Expand Down Expand Up @@ -89,3 +92,34 @@ rmw_ret_t get_z_config(const ConfigurableEntity & entity, z_owned_config_t * con

return _get_z_config(envar_map_it->second.first, default_config_path.c_str(), config);
}

///==============================================================================
std::optional<uint64_t> zenoh_router_check_attempts()
{
const char * envar_value;
// The default value is to check indefinitely.
uint64_t default_value = std::numeric_limits<uint64_t>::max();

if (NULL != rcutils_get_env(router_check_attempts_envar, &envar_value)) {
// NULL is returned if everything is ok.
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp", "Envar %s cannot be read. Report this bug.",
router_check_attempts_envar);
return default_value;
}
// If the environment variable contains a value, handle it accordingly.
if (envar_value[0] != '\0') {
const auto read_value = std::strtol(envar_value, nullptr, 10);
if (read_value > 0) {
return read_value;
} else if (read_value < 0) {
// If less than 0, we skip the check.
return std::nullopt;
}
// If the value is 0, check indefinitely.
return default_value;
}

// If unset, check indefinitely.
return default_value;
}
12 changes: 12 additions & 0 deletions rmw_zenoh_cpp/src/detail/zenoh_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <zenoh.h>

#include <optional>
#include <unordered_map>
#include <utility>

Expand All @@ -43,4 +44,15 @@ enum class ConfigurableEntity : uint8_t
[[nodiscard]]
rmw_ret_t get_z_config(const ConfigurableEntity & entity, z_owned_config_t * config);

///==============================================================================
/// Get the number of times rmw_init should try to connect to a zenoh router
/// based on the environment variable ZENOH_ROUTER_CHECK_ATTEMPTS.
/// @details The behavior is as follows:
/// - If not set or 0, the max value is returned.
/// - If less than 0, std::nullopt is returned.
/// - Else value of environemnt variable is returned.
/// @return The number of times to try connecting to a zenoh router and
/// std::nullopt if establishing a connection to a router is not required.
std::optional<uint64_t> zenoh_router_check_attempts();

#endif // DETAIL__ZENOH_CONFIG_HPP_
16 changes: 11 additions & 5 deletions rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,38 @@
#include <sstream>
#include <string>

#include "liveliness_utils.hpp"

///=============================================================================
rmw_ret_t zenoh_router_check(z_session_t session)
{
// Initialize context for callback
int context = 0;

// Define callback
auto callback = [](const struct z_id_t * id, void * ctx) {
const std::string id_str = liveliness::zid_to_str(*id);
RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"Successfully connected to a Zenoh router with id %s.", id_str.c_str());
// Note: Callback is guaranteed to never be called
// concurrently according to z_info_routers_zid docstring
static_cast<void>(id);
(*(static_cast<int *>(ctx)))++;
};

rmw_ret_t ret = RMW_RET_OK;
z_owned_closure_zid_t router_callback = z_closure(callback, nullptr /* drop */, &context);
if (z_info_routers_zid(session, z_move(router_callback))) {
RCUTILS_LOG_ERROR_NAMED(
"ZenohRouterCheck",
"Failed to evaluate if Zenoh routers are connected to the session");
"rmw_zenoh_cpp",
"Failed to evaluate if Zenoh routers are connected to the session.");
ret = RMW_RET_ERROR;
} else {
if (context == 0) {
RCUTILS_LOG_ERROR_NAMED(
"ZenohRouterCheck",
"No Zenoh router connected to the session");
"rmw_zenoh_cpp",
"Unable to connect to a Zenoh router. "
"Have you started a router with `ros2 run rmw_zenoh_cpp rmw_zenohd`?");
ret = RMW_RET_ERROR;
}
}
Expand Down
44 changes: 29 additions & 15 deletions rmw_zenoh_cpp/src/rmw_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <new>
#include <string>
#include <thread>

#include "detail/guard_condition.hpp"
#include "detail/identifier.hpp"
Expand Down Expand Up @@ -175,10 +176,24 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
z_id_t zid = z_info_zid(z_loan(context->impl->session));
context->impl->graph_cache = std::make_unique<GraphCache>(zid);

// Verify if the zenoh router is running.
if ((ret = zenoh_router_check(z_loan(context->impl->session))) != RMW_RET_OK) {
RMW_SET_ERROR_MSG("Error while checking for Zenoh router");
return ret;
// Verify if the zenoh router is running if configured.
const std::optional<uint64_t> configured_connection_attempts = zenoh_router_check_attempts();
if (configured_connection_attempts.has_value()) {
ret = RMW_RET_ERROR;
uint64_t connection_attempts = 0;
// Retry until the connection is successful.
while (ret != RMW_RET_OK && connection_attempts < configured_connection_attempts.value()) {
if ((ret = zenoh_router_check(z_loan(context->impl->session))) != RMW_RET_OK) {
++connection_attempts;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
if (ret != RMW_RET_OK) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"Unable to connect to a Zenoh router after %zu retries.",
configured_connection_attempts.value());
return ret;
}
}

// Initialize the shm manager if shared_memory is enabled in the config.
Expand Down Expand Up @@ -362,15 +377,6 @@ rmw_shutdown(rmw_context_t * context)
return RMW_RET_ERROR;
}

const rcutils_allocator_t * allocator = &context->options.allocator;

RMW_TRY_DESTRUCTOR(
static_cast<GuardCondition *>(context->impl->graph_guard_condition->data)->~GuardCondition(),
GuardCondition, );
allocator->deallocate(context->impl->graph_guard_condition->data, allocator->state);

allocator->deallocate(context->impl->graph_guard_condition, allocator->state);

context->impl->is_shutdown = true;

return RMW_RET_OK;
Expand All @@ -396,10 +402,18 @@ rmw_context_fini(rmw_context_t * context)
return RMW_RET_INVALID_ARGUMENT;
}

RMW_TRY_DESTRUCTOR(context->impl->~rmw_context_impl_t(), rmw_context_impl_t, );

const rcutils_allocator_t * allocator = &context->options.allocator;

RMW_TRY_DESTRUCTOR(
static_cast<GuardCondition *>(context->impl->graph_guard_condition->data)->~GuardCondition(),
GuardCondition, );
allocator->deallocate(context->impl->graph_guard_condition->data, allocator->state);

allocator->deallocate(context->impl->graph_guard_condition, allocator->state);
context->impl->graph_guard_condition = nullptr;

RMW_TRY_DESTRUCTOR(context->impl->~rmw_context_impl_t(), rmw_context_impl_t, );

allocator->deallocate(context->impl, allocator->state);

rmw_ret_t ret = rmw_init_options_fini(&context->options);
Expand Down
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2277,7 +2277,7 @@ rmw_send_request(
opts.target = Z_QUERY_TARGET_ALL_COMPLETE;
// The default timeout for a z_get query is 10 seconds and if a response is not received within
// this window, the queryable will return an invalid reply. However, it is common for actions,
// which are implemented using services, to take an extended duration to complete.Hence, we set
// which are implemented using services, to take an extended duration to complete. Hence, we set
// the timeout_ms to the largest supported value to account for most realistic scenarios.
opts.timeout_ms = std::numeric_limits<uint64_t>::max();
// Latest consolidation guarantees unicity of replies for the same key expression,
Expand Down

0 comments on commit 59ffcdd

Please sign in to comment.