From f9ee9ed31a48a798a82148436dd2a6ba2c475e08 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Fri, 21 Jun 2024 00:06:50 +0000 Subject: [PATCH] Fix a race condition in rmw_wait. In the current code, there is a race condition in rmw_wait. This happens because of the following sequence: 1. Without taking a lock, we check if any of the entities in the wait_set are ready, and if not attach the condition_variable to it. 2. Then we take the lock, and go to sleep on the condition_variable. However, doing step 1 takes time, and we check in a particular order: guard_conditions, events, subscriptions, services, clients. The race happens because a subscription may come in after we've checked subscriptions in the above list (while we are checking services and clients). In that case, we'll unnecessarily go to sleep on the condition_variable, even though something is ready. This increases the latency. To solve the issue, we still do all of the checking and attaching unlocked. However, we update the "notify" method of each entity so that it takes the condition_variable lock, and in addition to kicking the condition_variable it sets a boolean in the wait_set structure to "true". Then, in rmw_wait(), after we have finished attaching, we take the lock, and set a predicate on the condition_variable so that it will quit if wait_set_data->triggered is true. Thus, if one of the entities became ready after we checked, we'll notice it and not go to sleep. This also allows us to deal with "spurious" wakeups, where the condition_variable was woken up even though nothing in this wait_set became ready. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/event.cpp | 12 +++--- rmw_zenoh_cpp/src/detail/event.hpp | 6 ++- rmw_zenoh_cpp/src/detail/guard_condition.cpp | 20 ++++++---- rmw_zenoh_cpp/src/detail/guard_condition.hpp | 8 ++-- rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 36 ++++++++++-------- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 22 ++++------- .../src/detail/rmw_wait_set_data.hpp | 37 +++++++++++++++++++ rmw_zenoh_cpp/src/rmw_zenoh.cpp | 31 +++++++++------- 8 files changed, 111 insertions(+), 61 deletions(-) create mode 100644 rmw_zenoh_cpp/src/detail/rmw_wait_set_data.hpp diff --git a/rmw_zenoh_cpp/src/detail/event.cpp b/rmw_zenoh_cpp/src/detail/event.cpp index 10d7a618..03bec0ec 100644 --- a/rmw_zenoh_cpp/src/detail/event.cpp +++ b/rmw_zenoh_cpp/src/detail/event.cpp @@ -169,7 +169,7 @@ void EventsManager::add_new_event( ///============================================================================= bool EventsManager::queue_has_data_and_attach_condition_if_not( rmw_zenoh_event_type_t event_id, - std::condition_variable * condition_variable) + rmw_wait_set_data_t * wait_set_data) { if (event_id > ZENOH_EVENT_ID_MAX) { RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( @@ -185,7 +185,7 @@ bool EventsManager::queue_has_data_and_attach_condition_if_not( return true; } - event_conditions_[event_id] = condition_variable; + wait_set_data_[event_id] = wait_set_data; return false; } @@ -203,7 +203,7 @@ bool EventsManager::detach_condition_and_event_queue_is_empty(rmw_zenoh_event_ty std::lock_guard lock(event_condition_mutex_); - event_conditions_[event_id] = nullptr; + wait_set_data_[event_id] = nullptr; return event_queues_[event_id].empty(); } @@ -220,8 +220,10 @@ void EventsManager::notify_event(rmw_zenoh_event_type_t event_id) } std::lock_guard lock(event_condition_mutex_); - if (event_conditions_[event_id] != nullptr) { - event_conditions_[event_id]->notify_one(); + if (wait_set_data_[event_id] != nullptr) { + std::lock_guard wait_set_lock(wait_set_data_[event_id]->condition_mutex); + wait_set_data_[event_id]->triggered = true; + wait_set_data_[event_id]->condition_variable.notify_one(); } } } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/event.hpp b/rmw_zenoh_cpp/src/detail/event.hpp index 25f5dfe9..96f8629a 100644 --- a/rmw_zenoh_cpp/src/detail/event.hpp +++ b/rmw_zenoh_cpp/src/detail/event.hpp @@ -25,6 +25,8 @@ #include "rmw/event.h" #include "rmw/event_callback_type.h" +#include "rmw_wait_set_data.hpp" + namespace rmw_zenoh_cpp { ///============================================================================= @@ -135,7 +137,7 @@ class EventsManager /// @param condition_variable to attach. bool queue_has_data_and_attach_condition_if_not( rmw_zenoh_event_type_t event_id, - std::condition_variable * condition_variable); + rmw_wait_set_data_t * wait_set_data); /// @brief Detach the condition variable provided by rmw_wait. bool detach_condition_and_event_queue_is_empty(rmw_zenoh_event_type_t event_id); @@ -153,7 +155,7 @@ class EventsManager /// Mutex to lock for event_condition. mutable std::mutex event_condition_mutex_; /// Condition variable to attach for event notifications. - std::condition_variable * event_conditions_[ZENOH_EVENT_ID_MAX + 1]{nullptr}; + rmw_wait_set_data_t * wait_set_data_[ZENOH_EVENT_ID_MAX + 1]{nullptr}; rmw_event_callback_t event_callback_[ZENOH_EVENT_ID_MAX + 1] {nullptr}; const void * event_data_[ZENOH_EVENT_ID_MAX + 1] {nullptr}; diff --git a/rmw_zenoh_cpp/src/detail/guard_condition.cpp b/rmw_zenoh_cpp/src/detail/guard_condition.cpp index b9f78740..1b6b6ba0 100644 --- a/rmw_zenoh_cpp/src/detail/guard_condition.cpp +++ b/rmw_zenoh_cpp/src/detail/guard_condition.cpp @@ -13,14 +13,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include + #include "guard_condition.hpp" +#include "rmw_wait_set_data.hpp" namespace rmw_zenoh_cpp { ///============================================================================= GuardCondition::GuardCondition() : has_triggered_(false), - condition_variable_(nullptr) + wait_set_data_(nullptr) { } @@ -34,28 +37,31 @@ void GuardCondition::trigger() // be called has_triggered_ = true; - if (condition_variable_ != nullptr) { - condition_variable_->notify_one(); + if (wait_set_data_ != nullptr) { + std::lock_guard wait_set_lock(wait_set_data_->condition_mutex); + wait_set_data_->triggered = true; + wait_set_data_->condition_variable.notify_one(); } } ///============================================================================= -bool GuardCondition::check_and_attach_condition_if_not(std::condition_variable * condition_variable) +bool GuardCondition::check_and_attach_condition_if_not(rmw_wait_set_data_t * wait_set_data) { std::lock_guard lock(internal_mutex_); if (has_triggered_) { return true; } - condition_variable_ = condition_variable; + + wait_set_data_ = wait_set_data; return false; } ///============================================================================= -bool GuardCondition::detach_condition_and_trigger_set() +bool GuardCondition::detach_condition_and_is_trigger_set() { std::lock_guard lock(internal_mutex_); - condition_variable_ = nullptr; + wait_set_data_ = nullptr; bool ret = has_triggered_; diff --git a/rmw_zenoh_cpp/src/detail/guard_condition.hpp b/rmw_zenoh_cpp/src/detail/guard_condition.hpp index 96e3b3bd..b0bfc2bb 100644 --- a/rmw_zenoh_cpp/src/detail/guard_condition.hpp +++ b/rmw_zenoh_cpp/src/detail/guard_condition.hpp @@ -20,6 +20,8 @@ #include #include +#include "rmw_wait_set_data.hpp" + namespace rmw_zenoh_cpp { ///============================================================================= @@ -31,14 +33,14 @@ class GuardCondition final // Sets has_triggered_ to true and calls notify_one() on condition_variable_ if set. void trigger(); - bool check_and_attach_condition_if_not(std::condition_variable * condition_variable); + bool check_and_attach_condition_if_not(rmw_wait_set_data_t * wait_set_data); - bool detach_condition_and_trigger_set(); + bool detach_condition_and_is_trigger_set(); private: mutable std::mutex internal_mutex_; std::atomic_bool has_triggered_; - std::condition_variable * condition_variable_; + rmw_wait_set_data_t * wait_set_data_; }; } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 4445d89e..c2b94ffb 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -84,14 +84,14 @@ size_t rmw_publisher_data_t::get_next_sequence_number() ///============================================================================= bool rmw_subscription_data_t::queue_has_data_and_attach_condition_if_not( - std::condition_variable * condition_variable) + rmw_wait_set_data_t * wait_set_data) { std::lock_guard lock(condition_mutex_); if (!message_queue_.empty()) { return true; } - condition_ = condition_variable; + wait_set_data_ = wait_set_data; return false; } @@ -100,8 +100,10 @@ bool rmw_subscription_data_t::queue_has_data_and_attach_condition_if_not( void rmw_subscription_data_t::notify() { std::lock_guard lock(condition_mutex_); - if (condition_ != nullptr) { - condition_->notify_one(); + if (wait_set_data_ != nullptr) { + std::lock_guard wait_set_lock(wait_set_data_->condition_mutex); + wait_set_data_->triggered = true; + wait_set_data_->condition_variable.notify_one(); } } @@ -109,7 +111,7 @@ void rmw_subscription_data_t::notify() bool rmw_subscription_data_t::detach_condition_and_queue_is_empty() { std::lock_guard lock(condition_mutex_); - condition_ = nullptr; + wait_set_data_ = nullptr; return message_queue_.empty(); } @@ -183,13 +185,13 @@ void rmw_subscription_data_t::add_new_message( ///============================================================================= bool rmw_service_data_t::queue_has_data_and_attach_condition_if_not( - std::condition_variable * condition_variable) + rmw_wait_set_data_t * wait_set_data) { std::lock_guard lock(condition_mutex_); if (!query_queue_.empty()) { return true; } - condition_ = condition_variable; + wait_set_data_ = wait_set_data; return false; } @@ -198,7 +200,7 @@ bool rmw_service_data_t::queue_has_data_and_attach_condition_if_not( bool rmw_service_data_t::detach_condition_and_queue_is_empty() { std::lock_guard lock(condition_mutex_); - condition_ = nullptr; + wait_set_data_ = nullptr; return query_queue_.empty(); } @@ -221,8 +223,10 @@ std::unique_ptr rmw_service_data_t::pop_next_query() void rmw_service_data_t::notify() { std::lock_guard lock(condition_mutex_); - if (condition_ != nullptr) { - condition_->notify_one(); + if (wait_set_data_ != nullptr) { + std::lock_guard wait_set_lock(wait_set_data_->condition_mutex); + wait_set_data_->triggered = true; + wait_set_data_->condition_variable.notify_one(); } } @@ -312,8 +316,10 @@ std::unique_ptr rmw_service_data_t::take_from_query_map( void rmw_client_data_t::notify() { std::lock_guard lock(condition_mutex_); - if (condition_ != nullptr) { - condition_->notify_one(); + if (wait_set_data_ != nullptr) { + std::lock_guard wait_set_lock(wait_set_data_->condition_mutex); + wait_set_data_->triggered = true; + wait_set_data_->condition_variable.notify_one(); } } @@ -342,13 +348,13 @@ void rmw_client_data_t::add_new_reply(std::unique_ptr reply) ///============================================================================= bool rmw_client_data_t::queue_has_data_and_attach_condition_if_not( - std::condition_variable * condition_variable) + rmw_wait_set_data_t * wait_set_data) { std::lock_guard lock(condition_mutex_); if (!reply_queue_.empty()) { return true; } - condition_ = condition_variable; + wait_set_data_ = wait_set_data; return false; } @@ -357,7 +363,7 @@ bool rmw_client_data_t::queue_has_data_and_attach_condition_if_not( bool rmw_client_data_t::detach_condition_and_queue_is_empty() { std::lock_guard lock(condition_mutex_); - condition_ = nullptr; + wait_set_data_ = nullptr; return reply_queue_.empty(); } diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index bfdff21b..f10e84f3 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -37,6 +37,7 @@ #include "event.hpp" #include "graph_cache.hpp" #include "message_type_support.hpp" +#include "rmw_wait_set_data.hpp" #include "service_type_support.hpp" /// Structs for various type erased data fields. @@ -128,15 +129,6 @@ class rmw_publisher_data_t final size_t sequence_number_{1}; }; -///============================================================================= -struct rmw_wait_set_data_t -{ - std::condition_variable condition_variable; - std::mutex condition_mutex; - - rmw_context_t * context; -}; - ///============================================================================= // z_owned_closure_sample_t void sub_data_handler(const z_sample_t * sample, void * sub_data); @@ -181,7 +173,7 @@ class rmw_subscription_data_t final MessageTypeSupport * type_support; rmw_context_t * context; - bool queue_has_data_and_attach_condition_if_not(std::condition_variable * condition_variable); + bool queue_has_data_and_attach_condition_if_not(rmw_wait_set_data_t * wait_set_data); bool detach_condition_and_queue_is_empty(); @@ -202,7 +194,7 @@ class rmw_subscription_data_t final void notify(); - std::condition_variable * condition_{nullptr}; + rmw_wait_set_data_t * wait_set_data_{nullptr}; std::mutex condition_mutex_; }; @@ -253,7 +245,7 @@ class rmw_service_data_t final rmw_context_t * context; - bool queue_has_data_and_attach_condition_if_not(std::condition_variable * condition_variable); + bool queue_has_data_and_attach_condition_if_not(rmw_wait_set_data_t * wait_set_data); bool detach_condition_and_queue_is_empty(); @@ -279,7 +271,7 @@ class rmw_service_data_t final std::unordered_map sequence_to_query_map_; std::mutex sequence_to_query_map_mutex_; - std::condition_variable * condition_{nullptr}; + rmw_wait_set_data_t * wait_set_data_{nullptr}; std::mutex condition_mutex_; }; @@ -329,7 +321,7 @@ class rmw_client_data_t final void add_new_reply(std::unique_ptr reply); - bool queue_has_data_and_attach_condition_if_not(std::condition_variable * condition_variable); + bool queue_has_data_and_attach_condition_if_not(rmw_wait_set_data_t * wait_set_data); bool detach_condition_and_queue_is_empty(); @@ -343,7 +335,7 @@ class rmw_client_data_t final size_t sequence_number_{1}; std::mutex sequence_number_mutex_; - std::condition_variable * condition_{nullptr}; + rmw_wait_set_data_t * wait_set_data_{nullptr}; std::mutex condition_mutex_; std::deque> reply_queue_; diff --git a/rmw_zenoh_cpp/src/detail/rmw_wait_set_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_wait_set_data.hpp new file mode 100644 index 00000000..ae78c6cf --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/rmw_wait_set_data.hpp @@ -0,0 +1,37 @@ +// Copyright 2023 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__RMW_WAIT_SET_DATA_HPP_ +#define DETAIL__RMW_WAIT_SET_DATA_HPP_ + +#include +#include + +#include "rmw/rmw.h" + +namespace rmw_zenoh_cpp +{ + +struct rmw_wait_set_data_t +{ + std::condition_variable condition_variable; + std::mutex condition_mutex; + + bool triggered{false}; + + rmw_context_t * context; +}; +} // namespace rmw_zenoh_cpp + +#endif // DETAIL__RMW_WAIT_SET_DATA_HPP_ diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 6b70f32b..f7c793b3 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -3292,7 +3292,7 @@ static bool check_and_attach_condition( if (gc == nullptr) { continue; } - if (gc->check_and_attach_condition_if_not(&wait_set_data->condition_variable)) { + if (gc->check_and_attach_condition_if_not(wait_set_data)) { return true; } } @@ -3307,7 +3307,7 @@ static bool check_and_attach_condition( if (event_data != nullptr) { if (event_data->queue_has_data_and_attach_condition_if_not( zenoh_event_it->second, - &wait_set_data->condition_variable)) + wait_set_data)) { return true; } @@ -3327,9 +3327,7 @@ static bool check_and_attach_condition( if (sub_data == nullptr) { continue; } - if (sub_data->queue_has_data_and_attach_condition_if_not( - &wait_set_data->condition_variable)) - { + if (sub_data->queue_has_data_and_attach_condition_if_not(wait_set_data)) { return true; } } @@ -3341,9 +3339,7 @@ static bool check_and_attach_condition( if (serv_data == nullptr) { continue; } - if (serv_data->queue_has_data_and_attach_condition_if_not( - &wait_set_data->condition_variable)) - { + if (serv_data->queue_has_data_and_attach_condition_if_not(wait_set_data)) { return true; } } @@ -3356,9 +3352,7 @@ static bool check_and_attach_condition( if (client_data == nullptr) { continue; } - if (client_data->queue_has_data_and_attach_condition_if_not( - &wait_set_data->condition_variable)) - { + if (client_data->queue_has_data_and_attach_condition_if_not(wait_set_data)) { return true; } } @@ -3416,13 +3410,22 @@ rmw_wait( // "wait forever", if it specified as 0 it means "never wait", and if it is anything else wait // for that amount of time. if (wait_timeout == nullptr) { - wait_set_data->condition_variable.wait(lock); + wait_set_data->condition_variable.wait( + lock, [wait_set_data]() { + return wait_set_data->triggered; + }); } else { if (wait_timeout->sec != 0 || wait_timeout->nsec != 0) { wait_set_data->condition_variable.wait_for( - lock, std::chrono::nanoseconds(wait_timeout->nsec + RCUTILS_S_TO_NS(wait_timeout->sec))); + lock, + std::chrono::nanoseconds(wait_timeout->nsec + RCUTILS_S_TO_NS(wait_timeout->sec)), + [wait_set_data]() {return wait_set_data->triggered;}); } } + + // We reset here (while still holding the lock) because we only cared about this through this + // particular call. + wait_set_data->triggered = false; } bool wait_result = false; @@ -3436,7 +3439,7 @@ rmw_wait( if (gc == nullptr) { continue; } - if (!gc->detach_condition_and_trigger_set()) { + if (!gc->detach_condition_and_is_trigger_set()) { // Setting to nullptr lets rcl know that this guard condition is not ready guard_conditions->guard_conditions[i] = nullptr; } else {