Skip to content

Commit

Permalink
Check for zenoh router in a thread
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Jul 12, 2024
1 parent 5fbf902 commit 9a150cc
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 22 deletions.
62 changes: 62 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@

#include "attachment_helpers.hpp"
#include "rmw_data_types.hpp"
#include "zenoh_config.hpp"
#include "zenoh_router_check.hpp"


///=============================================================================
namespace
Expand Down Expand Up @@ -60,6 +63,65 @@ size_t rmw_context_impl_s::get_next_entity_id()
return next_entity_id_++;
}

///=============================================================================
void rmw_context_impl_s::async_check_router()
{
const std::optional<uint64_t> configured_connection_attempts =
rmw_zenoh_cpp::zenoh_router_check_attempts();
if (configured_connection_attempts.has_value()) {
router_check_thread_ = std::thread(
[this](std::optional<uint64_t> configured_connection_attempts, z_session_t session)
{
uint64_t connection_attempts = 0;
rmw_ret_t ret = RMW_RET_TIMEOUT;
// Retry until the connection is successful.
while (ret != RMW_RET_OK && connection_attempts < configured_connection_attempts.value()) {
if ((ret = rmw_zenoh_cpp::zenoh_router_check(session)) != RMW_RET_OK) {
++connection_attempts;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
if (ret != RMW_RET_OK) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"Unable to connect to a Zenoh router after %zu retries.",
configured_connection_attempts.value());
// Return without setting router_is_available_ to true.
return;
}
// The router check has succeeded so we set router_is_available_ to true.
std::lock_guard<std::mutex> lock(router_check_mutex_);
router_is_available_ = true;
router_check_cv_.notify_all();
},
configured_connection_attempts,
z_loan(session)
);
}
else {
// If we skip the router check, directly set available_ to true.
std::lock_guard<std::mutex> lock(router_check_mutex_);
router_is_available_ = true;
}
}

///=============================================================================
bool rmw_context_impl_s::wait_for_router()
{
// Wait until the cv is notified or router check fails.
std::unique_lock<std::mutex> lock(router_check_mutex_);
router_check_cv_.wait(lock, [this]{ return router_is_available_ == true;});
return router_is_available_;
}

///=============================================================================
rmw_context_impl_s::~rmw_context_impl_s()
{
if (router_check_thread_.joinable()) {
router_check_thread_.join();
}
}


namespace rmw_zenoh_cpp
{
///=============================================================================
Expand Down
15 changes: 15 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <optional>
#include <string>
#include <unordered_map>
#include <thread>
#include <utility>
#include <variant>
#include <vector>
Expand Down Expand Up @@ -69,9 +70,23 @@ class rmw_context_impl_s final

size_t get_next_entity_id();

// Asynchronously check if a router is available in the network.
// Use wait_for_router() to wait until a router is discovered.
void async_check_router();

// Blocking call that will return false if the check for router failed.
bool wait_for_router();

~rmw_context_impl_s();

private:
// A counter to assign a local id for every entity created in this session.
size_t next_entity_id_{0};
mutable std::mutex router_check_mutex_;
std::thread router_check_thread_;
bool router_is_available_{false};
std::condition_variable router_check_cv_;
rmw_ret_t router_check_status_{RMW_RET_TIMEOUT};
};

namespace rmw_zenoh_cpp
Expand Down
23 changes: 2 additions & 21 deletions rmw_zenoh_cpp/src/rmw_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include "detail/liveliness_utils.hpp"
#include "detail/rmw_data_types.hpp"
#include "detail/zenoh_config.hpp"
#include "detail/zenoh_router_check.hpp"

#include "rcutils/env.h"
#include "detail/logging_macros.hpp"
Expand Down Expand Up @@ -201,26 +200,8 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
z_id_t zid = z_info_zid(z_loan(context->impl->session));
context->impl->graph_cache = std::make_unique<rmw_zenoh_cpp::GraphCache>(zid);

// Verify if the zenoh router is running if configured.
const std::optional<uint64_t> configured_connection_attempts =
rmw_zenoh_cpp::zenoh_router_check_attempts();
if (configured_connection_attempts.has_value()) {
ret = RMW_RET_ERROR;
uint64_t connection_attempts = 0;
// Retry until the connection is successful.
while (ret != RMW_RET_OK && connection_attempts < configured_connection_attempts.value()) {
if ((ret = rmw_zenoh_cpp::zenoh_router_check(z_loan(context->impl->session))) != RMW_RET_OK) {
++connection_attempts;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
if (ret != RMW_RET_OK) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"Unable to connect to a Zenoh router after %zu retries.",
configured_connection_attempts.value());
return ret;
}
}
// Check if a zenoh router is available.
context->impl->async_check_router();

// Initialize the shm manager if shared_memory is enabled in the config.
if (shm_enabled._cstr != nullptr &&
Expand Down
7 changes: 6 additions & 1 deletion rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,11 @@ rmw_create_node(
return nullptr;
}

if (!context->impl->wait_for_router()){
// Error handled.
return nullptr;
}

int validation_result = RMW_NODE_NAME_VALID;
rmw_ret_t ret = rmw_validate_node_name(name, &validation_result, nullptr);
if (RMW_RET_OK != ret) {
Expand Down Expand Up @@ -1328,7 +1333,6 @@ rmw_create_subscription(
RMW_SET_ERROR_MSG("zenoh session is invalid");
return nullptr;
}

rcutils_allocator_t * allocator = &node->context->options.allocator;

// Create the rmw_subscription.
Expand Down Expand Up @@ -2078,6 +2082,7 @@ rmw_create_client(
RMW_SET_ERROR_MSG("zenoh session is invalid");
return nullptr;
}

RMW_CHECK_ARGUMENT_FOR_NULL(node->data, nullptr);
const rmw_zenoh_cpp::rmw_node_data_t * node_data =
static_cast<rmw_zenoh_cpp::rmw_node_data_t *>(node->data);
Expand Down

0 comments on commit 9a150cc

Please sign in to comment.