diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 51f84b33..f2f4256f 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -32,6 +32,9 @@ #include "attachment_helpers.hpp" #include "rmw_data_types.hpp" +#include "zenoh_config.hpp" +#include "zenoh_router_check.hpp" + ///============================================================================= namespace @@ -60,6 +63,65 @@ size_t rmw_context_impl_s::get_next_entity_id() return next_entity_id_++; } +///============================================================================= +void rmw_context_impl_s::async_check_router() +{ + const std::optional configured_connection_attempts = + rmw_zenoh_cpp::zenoh_router_check_attempts(); + if (configured_connection_attempts.has_value()) { + router_check_thread_ = std::thread( + [this](std::optional configured_connection_attempts, z_session_t session) + { + uint64_t connection_attempts = 0; + rmw_ret_t ret = RMW_RET_TIMEOUT; + // Retry until the connection is successful. + while (ret != RMW_RET_OK && connection_attempts < configured_connection_attempts.value()) { + if ((ret = rmw_zenoh_cpp::zenoh_router_check(session)) != RMW_RET_OK) { + ++connection_attempts; + } + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + if (ret != RMW_RET_OK) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "Unable to connect to a Zenoh router after %zu retries.", + configured_connection_attempts.value()); + // Return without setting router_is_available_ to true. + return; + } + // The router check has succeeded so we set router_is_available_ to true. + std::lock_guard lock(router_check_mutex_); + router_is_available_ = true; + router_check_cv_.notify_all(); + }, + configured_connection_attempts, + z_loan(session) + ); + } + else { + // If we skip the router check, directly set available_ to true. + std::lock_guard lock(router_check_mutex_); + router_is_available_ = true; + } +} + +///============================================================================= +bool rmw_context_impl_s::wait_for_router() +{ + // Wait until the cv is notified or router check fails. + std::unique_lock lock(router_check_mutex_); + router_check_cv_.wait(lock, [this]{ return router_is_available_ == true;}); + return router_is_available_; +} + +///============================================================================= +rmw_context_impl_s::~rmw_context_impl_s() +{ + if (router_check_thread_.joinable()) { + router_check_thread_.join(); + } +} + + namespace rmw_zenoh_cpp { ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 60e4bd01..3a0475e1 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -69,9 +70,23 @@ class rmw_context_impl_s final size_t get_next_entity_id(); + // Asynchronously check if a router is available in the network. + // Use wait_for_router() to wait until a router is discovered. + void async_check_router(); + + // Blocking call that will return false if the check for router failed. + bool wait_for_router(); + + ~rmw_context_impl_s(); + private: // A counter to assign a local id for every entity created in this session. size_t next_entity_id_{0}; + mutable std::mutex router_check_mutex_; + std::thread router_check_thread_; + bool router_is_available_{false}; + std::condition_variable router_check_cv_; + rmw_ret_t router_check_status_{RMW_RET_TIMEOUT}; }; namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 2e0d08c3..0c712325 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -23,7 +23,6 @@ #include "detail/liveliness_utils.hpp" #include "detail/rmw_data_types.hpp" #include "detail/zenoh_config.hpp" -#include "detail/zenoh_router_check.hpp" #include "rcutils/env.h" #include "detail/logging_macros.hpp" @@ -201,26 +200,8 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) z_id_t zid = z_info_zid(z_loan(context->impl->session)); context->impl->graph_cache = std::make_unique(zid); - // Verify if the zenoh router is running if configured. - const std::optional configured_connection_attempts = - rmw_zenoh_cpp::zenoh_router_check_attempts(); - if (configured_connection_attempts.has_value()) { - ret = RMW_RET_ERROR; - uint64_t connection_attempts = 0; - // Retry until the connection is successful. - while (ret != RMW_RET_OK && connection_attempts < configured_connection_attempts.value()) { - if ((ret = rmw_zenoh_cpp::zenoh_router_check(z_loan(context->impl->session))) != RMW_RET_OK) { - ++connection_attempts; - } - std::this_thread::sleep_for(std::chrono::seconds(1)); - } - if (ret != RMW_RET_OK) { - RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( - "Unable to connect to a Zenoh router after %zu retries.", - configured_connection_attempts.value()); - return ret; - } - } + // Check if a zenoh router is available. + context->impl->async_check_router(); // Initialize the shm manager if shared_memory is enabled in the config. if (shm_enabled._cstr != nullptr && diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 4b0cb076..c1740198 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -219,6 +219,11 @@ rmw_create_node( return nullptr; } + if (!context->impl->wait_for_router()){ + // Error handled. + return nullptr; + } + int validation_result = RMW_NODE_NAME_VALID; rmw_ret_t ret = rmw_validate_node_name(name, &validation_result, nullptr); if (RMW_RET_OK != ret) { @@ -1328,7 +1333,6 @@ rmw_create_subscription( RMW_SET_ERROR_MSG("zenoh session is invalid"); return nullptr; } - rcutils_allocator_t * allocator = &node->context->options.allocator; // Create the rmw_subscription. @@ -2078,6 +2082,7 @@ rmw_create_client( RMW_SET_ERROR_MSG("zenoh session is invalid"); return nullptr; } + RMW_CHECK_ARGUMENT_FOR_NULL(node->data, nullptr); const rmw_zenoh_cpp::rmw_node_data_t * node_data = static_cast(node->data);