diff --git a/rmw_zenoh_cpp/src/detail/event.cpp b/rmw_zenoh_cpp/src/detail/event.cpp index b40f7048..10d7a618 100644 --- a/rmw_zenoh_cpp/src/detail/event.cpp +++ b/rmw_zenoh_cpp/src/detail/event.cpp @@ -103,22 +103,6 @@ void EventsManager::trigger_event_callback(rmw_zenoh_event_type_t event_id) return; } -///============================================================================= -bool EventsManager::event_queue_is_empty(rmw_zenoh_event_type_t event_id) const -{ - if (event_id > ZENOH_EVENT_ID_MAX) { - RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( - "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. " - "Report this bug.", - event_id); - return true; - } - - std::lock_guard lock(event_mutex_); - - return event_queues_[event_id].empty(); -} - ///============================================================================= std::unique_ptr EventsManager::pop_next_event( rmw_zenoh_event_type_t event_id) @@ -183,7 +167,7 @@ void EventsManager::add_new_event( } ///============================================================================= -void EventsManager::attach_event_condition( +bool EventsManager::queue_has_data_and_attach_condition_if_not( rmw_zenoh_event_type_t event_id, std::condition_variable * condition_variable) { @@ -192,26 +176,36 @@ void EventsManager::attach_event_condition( "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. " "Report this bug.", event_id); - return; + return false; } std::lock_guard lock(event_condition_mutex_); + + if (!event_queues_[event_id].empty()) { + return true; + } + event_conditions_[event_id] = condition_variable; + + return false; } ///============================================================================= -void EventsManager::detach_event_condition(rmw_zenoh_event_type_t event_id) +bool EventsManager::detach_condition_and_event_queue_is_empty(rmw_zenoh_event_type_t event_id) { if (event_id > ZENOH_EVENT_ID_MAX) { RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. " "Report this bug.", event_id); - return; + return true; } std::lock_guard lock(event_condition_mutex_); + event_conditions_[event_id] = nullptr; + + return event_queues_[event_id].empty(); } ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/event.hpp b/rmw_zenoh_cpp/src/detail/event.hpp index ca18ca30..25f5dfe9 100644 --- a/rmw_zenoh_cpp/src/detail/event.hpp +++ b/rmw_zenoh_cpp/src/detail/event.hpp @@ -120,10 +120,6 @@ class EventsManager rmw_event_callback_t callback, const void * user_data); - /// @brief Returns true if the event queue is empty. - /// @param event_id the event id whose event queue should be checked. - bool event_queue_is_empty(rmw_zenoh_event_type_t event_id) const; - /// Pop the next event in the queue. /// @param event_id the event id whose queue should be popped. std::unique_ptr pop_next_event( @@ -137,12 +133,12 @@ class EventsManager /// @brief Attach the condition variable provided by rmw_wait. /// @param condition_variable to attach. - void attach_event_condition( + bool queue_has_data_and_attach_condition_if_not( rmw_zenoh_event_type_t event_id, std::condition_variable * condition_variable); /// @brief Detach the condition variable provided by rmw_wait. - void detach_event_condition(rmw_zenoh_event_type_t event_id); + bool detach_condition_and_event_queue_is_empty(rmw_zenoh_event_type_t event_id); private: /// @brief Trigger the callback for an event. diff --git a/rmw_zenoh_cpp/src/detail/guard_condition.cpp b/rmw_zenoh_cpp/src/detail/guard_condition.cpp index 76f1c1f8..b9f78740 100644 --- a/rmw_zenoh_cpp/src/detail/guard_condition.cpp +++ b/rmw_zenoh_cpp/src/detail/guard_condition.cpp @@ -40,29 +40,23 @@ void GuardCondition::trigger() } ///============================================================================= -void GuardCondition::attach_condition(std::condition_variable * condition_variable) +bool GuardCondition::check_and_attach_condition_if_not(std::condition_variable * condition_variable) { std::lock_guard lock(internal_mutex_); + if (has_triggered_) { + return true; + } condition_variable_ = condition_variable; + + return false; } ///============================================================================= -void GuardCondition::detach_condition() +bool GuardCondition::detach_condition_and_trigger_set() { std::lock_guard lock(internal_mutex_); condition_variable_ = nullptr; -} - -bool GuardCondition::has_triggered() const -{ - std::lock_guard lock(internal_mutex_); - return has_triggered_; -} -///============================================================================= -bool GuardCondition::get_and_reset_trigger() -{ - std::lock_guard lock(internal_mutex_); bool ret = has_triggered_; has_triggered_ = false; diff --git a/rmw_zenoh_cpp/src/detail/guard_condition.hpp b/rmw_zenoh_cpp/src/detail/guard_condition.hpp index 9adb737e..96e3b3bd 100644 --- a/rmw_zenoh_cpp/src/detail/guard_condition.hpp +++ b/rmw_zenoh_cpp/src/detail/guard_condition.hpp @@ -31,13 +31,9 @@ class GuardCondition final // Sets has_triggered_ to true and calls notify_one() on condition_variable_ if set. void trigger(); - void attach_condition(std::condition_variable * condition_variable); + bool check_and_attach_condition_if_not(std::condition_variable * condition_variable); - void detach_condition(); - - bool has_triggered() const; - - bool get_and_reset_trigger(); + bool detach_condition_and_trigger_set(); private: mutable std::mutex internal_mutex_; diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 0f14b9af..4445d89e 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -83,10 +83,17 @@ size_t rmw_publisher_data_t::get_next_sequence_number() } ///============================================================================= -void rmw_subscription_data_t::attach_condition(std::condition_variable * condition_variable) +bool rmw_subscription_data_t::queue_has_data_and_attach_condition_if_not( + std::condition_variable * condition_variable) { std::lock_guard lock(condition_mutex_); + if (!message_queue_.empty()) { + return true; + } + condition_ = condition_variable; + + return false; } ///============================================================================= @@ -99,16 +106,11 @@ void rmw_subscription_data_t::notify() } ///============================================================================= -void rmw_subscription_data_t::detach_condition() +bool rmw_subscription_data_t::detach_condition_and_queue_is_empty() { std::lock_guard lock(condition_mutex_); condition_ = nullptr; -} -///============================================================================= -bool rmw_subscription_data_t::message_queue_is_empty() const -{ - std::lock_guard lock(message_queue_mutex_); return message_queue_.empty(); } @@ -180,24 +182,25 @@ void rmw_subscription_data_t::add_new_message( } ///============================================================================= -bool rmw_service_data_t::query_queue_is_empty() const -{ - std::lock_guard lock(query_queue_mutex_); - return query_queue_.empty(); -} - -///============================================================================= -void rmw_service_data_t::attach_condition(std::condition_variable * condition_variable) +bool rmw_service_data_t::queue_has_data_and_attach_condition_if_not( + std::condition_variable * condition_variable) { std::lock_guard lock(condition_mutex_); + if (!query_queue_.empty()) { + return true; + } condition_ = condition_variable; + + return false; } ///============================================================================= -void rmw_service_data_t::detach_condition() +bool rmw_service_data_t::detach_condition_and_queue_is_empty() { std::lock_guard lock(condition_mutex_); condition_ = nullptr; + + return query_queue_.empty(); } ///============================================================================= @@ -338,25 +341,25 @@ void rmw_client_data_t::add_new_reply(std::unique_ptr reply) } ///============================================================================= -bool rmw_client_data_t::reply_queue_is_empty() const -{ - std::lock_guard lock(reply_queue_mutex_); - - return reply_queue_.empty(); -} - -///============================================================================= -void rmw_client_data_t::attach_condition(std::condition_variable * condition_variable) +bool rmw_client_data_t::queue_has_data_and_attach_condition_if_not( + std::condition_variable * condition_variable) { std::lock_guard lock(condition_mutex_); + if (!reply_queue_.empty()) { + return true; + } condition_ = condition_variable; + + return false; } ///============================================================================= -void rmw_client_data_t::detach_condition() +bool rmw_client_data_t::detach_condition_and_queue_is_empty() { std::lock_guard lock(condition_mutex_); condition_ = nullptr; + + return reply_queue_.empty(); } ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 1ae8b736..bfdff21b 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -181,11 +181,9 @@ class rmw_subscription_data_t final MessageTypeSupport * type_support; rmw_context_t * context; - void attach_condition(std::condition_variable * condition_variable); + bool queue_has_data_and_attach_condition_if_not(std::condition_variable * condition_variable); - void detach_condition(); - - bool message_queue_is_empty() const; + bool detach_condition_and_queue_is_empty(); std::unique_ptr pop_next_message(); @@ -255,11 +253,9 @@ class rmw_service_data_t final rmw_context_t * context; - bool query_queue_is_empty() const; - - void attach_condition(std::condition_variable * condition_variable); + bool queue_has_data_and_attach_condition_if_not(std::condition_variable * condition_variable); - void detach_condition(); + bool detach_condition_and_queue_is_empty(); std::unique_ptr pop_next_query(); @@ -333,11 +329,9 @@ class rmw_client_data_t final void add_new_reply(std::unique_ptr reply); - bool reply_queue_is_empty() const; - - void attach_condition(std::condition_variable * condition_variable); + bool queue_has_data_and_attach_condition_if_not(std::condition_variable * condition_variable); - void detach_condition(); + bool detach_condition_and_queue_is_empty(); std::unique_ptr pop_next_reply(); diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 0356be80..c57f73a7 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -3277,12 +3277,11 @@ static bool check_and_attach_condition( for (size_t i = 0; i < guard_conditions->guard_condition_count; ++i) { rmw_zenoh_cpp::GuardCondition * gc = static_cast(guard_conditions->guard_conditions[i]); - if (gc != nullptr) { - if (gc->has_triggered()) { - return true; - } - - gc->attach_condition(&wait_set_data->condition_variable); + if (gc == nullptr) { + continue; + } + if (gc->check_and_attach_condition_if_not(&wait_set_data->condition_variable)) { + return true; } } } @@ -3294,13 +3293,12 @@ static bool check_and_attach_condition( if (zenoh_event_it != rmw_zenoh_cpp::event_map.end()) { auto event_data = static_cast(event->data); if (event_data != nullptr) { - if (!event_data->event_queue_is_empty(zenoh_event_it->second)) { + if (event_data->queue_has_data_and_attach_condition_if_not( + zenoh_event_it->second, + &wait_set_data->condition_variable)) + { return true; } - - event_data->attach_event_condition( - zenoh_event_it->second, - &wait_set_data->condition_variable); } } else { RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( @@ -3314,12 +3312,13 @@ static bool check_and_attach_condition( for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { auto sub_data = static_cast(subscriptions->subscribers[i]); - if (sub_data != nullptr) { - if (!sub_data->message_queue_is_empty()) { - return true; - } - - sub_data->attach_condition(&wait_set_data->condition_variable); + if (sub_data == nullptr) { + continue; + } + if (sub_data->queue_has_data_and_attach_condition_if_not( + &wait_set_data->condition_variable)) + { + return true; } } } @@ -3327,12 +3326,13 @@ static bool check_and_attach_condition( if (services) { for (size_t i = 0; i < services->service_count; ++i) { auto serv_data = static_cast(services->services[i]); - if (serv_data != nullptr) { - if (!serv_data->query_queue_is_empty()) { - return true; - } - - serv_data->attach_condition(&wait_set_data->condition_variable); + if (serv_data == nullptr) { + continue; + } + if (serv_data->queue_has_data_and_attach_condition_if_not( + &wait_set_data->condition_variable)) + { + return true; } } } @@ -3341,12 +3341,13 @@ static bool check_and_attach_condition( for (size_t i = 0; i < clients->client_count; ++i) { rmw_zenoh_cpp::rmw_client_data_t * client_data = static_cast(clients->clients[i]); - if (client_data != nullptr) { - if (!client_data->reply_queue_is_empty()) { - return true; - } - - client_data->attach_condition(&wait_set_data->condition_variable); + if (client_data == nullptr) { + continue; + } + if (client_data->queue_has_data_and_attach_condition_if_not( + &wait_set_data->condition_variable)) + { + return true; } } } @@ -3415,34 +3416,30 @@ rmw_wait( } } + // According to the documentation for rmw_wait in rmw.h, entries in the various arrays that have + // *not* been triggered should be set to NULL if (guard_conditions) { - // Now detach the condition variable and mutex from each of the guard conditions for (size_t i = 0; i < guard_conditions->guard_condition_count; ++i) { rmw_zenoh_cpp::GuardCondition * gc = static_cast(guard_conditions->guard_conditions[i]); - if (gc != nullptr) { - gc->detach_condition(); - // According to the documentation for rmw_wait in rmw.h, entries in the - // array that have *not* been triggered should be set to NULL - if (!gc->get_and_reset_trigger()) { - guard_conditions->guard_conditions[i] = nullptr; - } + if (gc == nullptr) { + continue; + } + if (!gc->detach_condition_and_trigger_set()) { + // Setting to nullptr lets rcl know that this guard condition is not ready + guard_conditions->guard_conditions[i] = nullptr; } } } if (events) { - // Now detach the condition variable and mutex from each of the subscriptions for (size_t i = 0; i < events->event_count; ++i) { auto event = static_cast(events->events[i]); auto event_data = static_cast(event->data); if (event_data != nullptr) { auto zenoh_event_it = rmw_zenoh_cpp::event_map.find(event->event_type); if (zenoh_event_it != rmw_zenoh_cpp::event_map.end()) { - event_data->detach_event_condition(zenoh_event_it->second); - // According to the documentation for rmw_wait in rmw.h, entries in the - // array that have *not* been triggered should be set to NULL - if (event_data->event_queue_is_empty(zenoh_event_it->second)) { + if (event_data->detach_condition_and_event_queue_is_empty(zenoh_event_it->second)) { // Setting to nullptr lets rcl know that this subscription is not ready events->events[i] = nullptr; } @@ -3452,51 +3449,45 @@ rmw_wait( } if (subscriptions) { - // Now detach the condition variable and mutex from each of the subscriptions for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { auto sub_data = static_cast(subscriptions->subscribers[i]); - if (sub_data != nullptr) { - sub_data->detach_condition(); - // According to the documentation for rmw_wait in rmw.h, entries in the - // array that have *not* been triggered should be set to NULL - if (sub_data->message_queue_is_empty()) { - // Setting to nullptr lets rcl know that this subscription is not ready - subscriptions->subscribers[i] = nullptr; - } + if (sub_data == nullptr) { + continue; + } + + if (sub_data->detach_condition_and_queue_is_empty()) { + // Setting to nullptr lets rcl know that this subscription is not ready + subscriptions->subscribers[i] = nullptr; } } } if (services) { - // Now detach the condition variable and mutex from each of the services for (size_t i = 0; i < services->service_count; ++i) { auto serv_data = static_cast(services->services[i]); - if (serv_data != nullptr) { - serv_data->detach_condition(); - // According to the documentation for rmw_wait in rmw.h, entries in the - // array that have *not* been triggered should be set to NULL - if (serv_data->query_queue_is_empty()) { - // Setting to nullptr lets rcl know that this service is not ready - services->services[i] = nullptr; - } + if (serv_data == nullptr) { + continue; + } + + if (serv_data->detach_condition_and_queue_is_empty()) { + // Setting to nullptr lets rcl know that this service is not ready + services->services[i] = nullptr; } } } if (clients) { - // Now detach the condition variable and mutex from each of the clients for (size_t i = 0; i < clients->client_count; ++i) { rmw_zenoh_cpp::rmw_client_data_t * client_data = static_cast(clients->clients[i]); - if (client_data != nullptr) { - client_data->detach_condition(); - // According to the documentation for rmw_wait in rmw.h, entries in the - // array that have *not* been triggered should be set to NULL - if (client_data->reply_queue_is_empty()) { - // Setting to nullptr lets rcl know that this client is not ready - clients->clients[i] = nullptr; - } + if (client_data == nullptr) { + continue; + } + + if (client_data->detach_condition_and_queue_is_empty()) { + // Setting to nullptr lets rcl know that this client is not ready + clients->clients[i] = nullptr; } } }