diff --git a/rmw_zenoh_cpp/src/detail/event.cpp b/rmw_zenoh_cpp/src/detail/event.cpp index 10d7a618..03bec0ec 100644 --- a/rmw_zenoh_cpp/src/detail/event.cpp +++ b/rmw_zenoh_cpp/src/detail/event.cpp @@ -169,7 +169,7 @@ void EventsManager::add_new_event( ///============================================================================= bool EventsManager::queue_has_data_and_attach_condition_if_not( rmw_zenoh_event_type_t event_id, - std::condition_variable * condition_variable) + rmw_wait_set_data_t * wait_set_data) { if (event_id > ZENOH_EVENT_ID_MAX) { RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( @@ -185,7 +185,7 @@ bool EventsManager::queue_has_data_and_attach_condition_if_not( return true; } - event_conditions_[event_id] = condition_variable; + wait_set_data_[event_id] = wait_set_data; return false; } @@ -203,7 +203,7 @@ bool EventsManager::detach_condition_and_event_queue_is_empty(rmw_zenoh_event_ty std::lock_guard lock(event_condition_mutex_); - event_conditions_[event_id] = nullptr; + wait_set_data_[event_id] = nullptr; return event_queues_[event_id].empty(); } @@ -220,8 +220,10 @@ void EventsManager::notify_event(rmw_zenoh_event_type_t event_id) } std::lock_guard lock(event_condition_mutex_); - if (event_conditions_[event_id] != nullptr) { - event_conditions_[event_id]->notify_one(); + if (wait_set_data_[event_id] != nullptr) { + std::lock_guard wait_set_lock(wait_set_data_[event_id]->condition_mutex); + wait_set_data_[event_id]->triggered = true; + wait_set_data_[event_id]->condition_variable.notify_one(); } } } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/event.hpp b/rmw_zenoh_cpp/src/detail/event.hpp index 25f5dfe9..96f8629a 100644 --- a/rmw_zenoh_cpp/src/detail/event.hpp +++ b/rmw_zenoh_cpp/src/detail/event.hpp @@ -25,6 +25,8 @@ #include "rmw/event.h" #include "rmw/event_callback_type.h" +#include "rmw_wait_set_data.hpp" + namespace rmw_zenoh_cpp { ///============================================================================= @@ -135,7 +137,7 @@ class EventsManager /// @param condition_variable to attach. bool queue_has_data_and_attach_condition_if_not( rmw_zenoh_event_type_t event_id, - std::condition_variable * condition_variable); + rmw_wait_set_data_t * wait_set_data); /// @brief Detach the condition variable provided by rmw_wait. bool detach_condition_and_event_queue_is_empty(rmw_zenoh_event_type_t event_id); @@ -153,7 +155,7 @@ class EventsManager /// Mutex to lock for event_condition. mutable std::mutex event_condition_mutex_; /// Condition variable to attach for event notifications. - std::condition_variable * event_conditions_[ZENOH_EVENT_ID_MAX + 1]{nullptr}; + rmw_wait_set_data_t * wait_set_data_[ZENOH_EVENT_ID_MAX + 1]{nullptr}; rmw_event_callback_t event_callback_[ZENOH_EVENT_ID_MAX + 1] {nullptr}; const void * event_data_[ZENOH_EVENT_ID_MAX + 1] {nullptr}; diff --git a/rmw_zenoh_cpp/src/detail/guard_condition.cpp b/rmw_zenoh_cpp/src/detail/guard_condition.cpp index b9f78740..1b6b6ba0 100644 --- a/rmw_zenoh_cpp/src/detail/guard_condition.cpp +++ b/rmw_zenoh_cpp/src/detail/guard_condition.cpp @@ -13,14 +13,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include + #include "guard_condition.hpp" +#include "rmw_wait_set_data.hpp" namespace rmw_zenoh_cpp { ///============================================================================= GuardCondition::GuardCondition() : has_triggered_(false), - condition_variable_(nullptr) + wait_set_data_(nullptr) { } @@ -34,28 +37,31 @@ void GuardCondition::trigger() // be called has_triggered_ = true; - if (condition_variable_ != nullptr) { - condition_variable_->notify_one(); + if (wait_set_data_ != nullptr) { + std::lock_guard wait_set_lock(wait_set_data_->condition_mutex); + wait_set_data_->triggered = true; + wait_set_data_->condition_variable.notify_one(); } } ///============================================================================= -bool GuardCondition::check_and_attach_condition_if_not(std::condition_variable * condition_variable) +bool GuardCondition::check_and_attach_condition_if_not(rmw_wait_set_data_t * wait_set_data) { std::lock_guard lock(internal_mutex_); if (has_triggered_) { return true; } - condition_variable_ = condition_variable; + + wait_set_data_ = wait_set_data; return false; } ///============================================================================= -bool GuardCondition::detach_condition_and_trigger_set() +bool GuardCondition::detach_condition_and_is_trigger_set() { std::lock_guard lock(internal_mutex_); - condition_variable_ = nullptr; + wait_set_data_ = nullptr; bool ret = has_triggered_; diff --git a/rmw_zenoh_cpp/src/detail/guard_condition.hpp b/rmw_zenoh_cpp/src/detail/guard_condition.hpp index 96e3b3bd..b0bfc2bb 100644 --- a/rmw_zenoh_cpp/src/detail/guard_condition.hpp +++ b/rmw_zenoh_cpp/src/detail/guard_condition.hpp @@ -20,6 +20,8 @@ #include #include +#include "rmw_wait_set_data.hpp" + namespace rmw_zenoh_cpp { ///============================================================================= @@ -31,14 +33,14 @@ class GuardCondition final // Sets has_triggered_ to true and calls notify_one() on condition_variable_ if set. void trigger(); - bool check_and_attach_condition_if_not(std::condition_variable * condition_variable); + bool check_and_attach_condition_if_not(rmw_wait_set_data_t * wait_set_data); - bool detach_condition_and_trigger_set(); + bool detach_condition_and_is_trigger_set(); private: mutable std::mutex internal_mutex_; std::atomic_bool has_triggered_; - std::condition_variable * condition_variable_; + rmw_wait_set_data_t * wait_set_data_; }; } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 4445d89e..c2b94ffb 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -84,14 +84,14 @@ size_t rmw_publisher_data_t::get_next_sequence_number() ///============================================================================= bool rmw_subscription_data_t::queue_has_data_and_attach_condition_if_not( - std::condition_variable * condition_variable) + rmw_wait_set_data_t * wait_set_data) { std::lock_guard lock(condition_mutex_); if (!message_queue_.empty()) { return true; } - condition_ = condition_variable; + wait_set_data_ = wait_set_data; return false; } @@ -100,8 +100,10 @@ bool rmw_subscription_data_t::queue_has_data_and_attach_condition_if_not( void rmw_subscription_data_t::notify() { std::lock_guard lock(condition_mutex_); - if (condition_ != nullptr) { - condition_->notify_one(); + if (wait_set_data_ != nullptr) { + std::lock_guard wait_set_lock(wait_set_data_->condition_mutex); + wait_set_data_->triggered = true; + wait_set_data_->condition_variable.notify_one(); } } @@ -109,7 +111,7 @@ void rmw_subscription_data_t::notify() bool rmw_subscription_data_t::detach_condition_and_queue_is_empty() { std::lock_guard lock(condition_mutex_); - condition_ = nullptr; + wait_set_data_ = nullptr; return message_queue_.empty(); } @@ -183,13 +185,13 @@ void rmw_subscription_data_t::add_new_message( ///============================================================================= bool rmw_service_data_t::queue_has_data_and_attach_condition_if_not( - std::condition_variable * condition_variable) + rmw_wait_set_data_t * wait_set_data) { std::lock_guard lock(condition_mutex_); if (!query_queue_.empty()) { return true; } - condition_ = condition_variable; + wait_set_data_ = wait_set_data; return false; } @@ -198,7 +200,7 @@ bool rmw_service_data_t::queue_has_data_and_attach_condition_if_not( bool rmw_service_data_t::detach_condition_and_queue_is_empty() { std::lock_guard lock(condition_mutex_); - condition_ = nullptr; + wait_set_data_ = nullptr; return query_queue_.empty(); } @@ -221,8 +223,10 @@ std::unique_ptr rmw_service_data_t::pop_next_query() void rmw_service_data_t::notify() { std::lock_guard lock(condition_mutex_); - if (condition_ != nullptr) { - condition_->notify_one(); + if (wait_set_data_ != nullptr) { + std::lock_guard wait_set_lock(wait_set_data_->condition_mutex); + wait_set_data_->triggered = true; + wait_set_data_->condition_variable.notify_one(); } } @@ -312,8 +316,10 @@ std::unique_ptr rmw_service_data_t::take_from_query_map( void rmw_client_data_t::notify() { std::lock_guard lock(condition_mutex_); - if (condition_ != nullptr) { - condition_->notify_one(); + if (wait_set_data_ != nullptr) { + std::lock_guard wait_set_lock(wait_set_data_->condition_mutex); + wait_set_data_->triggered = true; + wait_set_data_->condition_variable.notify_one(); } } @@ -342,13 +348,13 @@ void rmw_client_data_t::add_new_reply(std::unique_ptr reply) ///============================================================================= bool rmw_client_data_t::queue_has_data_and_attach_condition_if_not( - std::condition_variable * condition_variable) + rmw_wait_set_data_t * wait_set_data) { std::lock_guard lock(condition_mutex_); if (!reply_queue_.empty()) { return true; } - condition_ = condition_variable; + wait_set_data_ = wait_set_data; return false; } @@ -357,7 +363,7 @@ bool rmw_client_data_t::queue_has_data_and_attach_condition_if_not( bool rmw_client_data_t::detach_condition_and_queue_is_empty() { std::lock_guard lock(condition_mutex_); - condition_ = nullptr; + wait_set_data_ = nullptr; return reply_queue_.empty(); } diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index bfdff21b..f10e84f3 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -37,6 +37,7 @@ #include "event.hpp" #include "graph_cache.hpp" #include "message_type_support.hpp" +#include "rmw_wait_set_data.hpp" #include "service_type_support.hpp" /// Structs for various type erased data fields. @@ -128,15 +129,6 @@ class rmw_publisher_data_t final size_t sequence_number_{1}; }; -///============================================================================= -struct rmw_wait_set_data_t -{ - std::condition_variable condition_variable; - std::mutex condition_mutex; - - rmw_context_t * context; -}; - ///============================================================================= // z_owned_closure_sample_t void sub_data_handler(const z_sample_t * sample, void * sub_data); @@ -181,7 +173,7 @@ class rmw_subscription_data_t final MessageTypeSupport * type_support; rmw_context_t * context; - bool queue_has_data_and_attach_condition_if_not(std::condition_variable * condition_variable); + bool queue_has_data_and_attach_condition_if_not(rmw_wait_set_data_t * wait_set_data); bool detach_condition_and_queue_is_empty(); @@ -202,7 +194,7 @@ class rmw_subscription_data_t final void notify(); - std::condition_variable * condition_{nullptr}; + rmw_wait_set_data_t * wait_set_data_{nullptr}; std::mutex condition_mutex_; }; @@ -253,7 +245,7 @@ class rmw_service_data_t final rmw_context_t * context; - bool queue_has_data_and_attach_condition_if_not(std::condition_variable * condition_variable); + bool queue_has_data_and_attach_condition_if_not(rmw_wait_set_data_t * wait_set_data); bool detach_condition_and_queue_is_empty(); @@ -279,7 +271,7 @@ class rmw_service_data_t final std::unordered_map sequence_to_query_map_; std::mutex sequence_to_query_map_mutex_; - std::condition_variable * condition_{nullptr}; + rmw_wait_set_data_t * wait_set_data_{nullptr}; std::mutex condition_mutex_; }; @@ -329,7 +321,7 @@ class rmw_client_data_t final void add_new_reply(std::unique_ptr reply); - bool queue_has_data_and_attach_condition_if_not(std::condition_variable * condition_variable); + bool queue_has_data_and_attach_condition_if_not(rmw_wait_set_data_t * wait_set_data); bool detach_condition_and_queue_is_empty(); @@ -343,7 +335,7 @@ class rmw_client_data_t final size_t sequence_number_{1}; std::mutex sequence_number_mutex_; - std::condition_variable * condition_{nullptr}; + rmw_wait_set_data_t * wait_set_data_{nullptr}; std::mutex condition_mutex_; std::deque> reply_queue_; diff --git a/rmw_zenoh_cpp/src/detail/rmw_wait_set_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_wait_set_data.hpp new file mode 100644 index 00000000..ae78c6cf --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/rmw_wait_set_data.hpp @@ -0,0 +1,37 @@ +// Copyright 2023 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__RMW_WAIT_SET_DATA_HPP_ +#define DETAIL__RMW_WAIT_SET_DATA_HPP_ + +#include +#include + +#include "rmw/rmw.h" + +namespace rmw_zenoh_cpp +{ + +struct rmw_wait_set_data_t +{ + std::condition_variable condition_variable; + std::mutex condition_mutex; + + bool triggered{false}; + + rmw_context_t * context; +}; +} // namespace rmw_zenoh_cpp + +#endif // DETAIL__RMW_WAIT_SET_DATA_HPP_ diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 6b70f32b..f7c793b3 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -3292,7 +3292,7 @@ static bool check_and_attach_condition( if (gc == nullptr) { continue; } - if (gc->check_and_attach_condition_if_not(&wait_set_data->condition_variable)) { + if (gc->check_and_attach_condition_if_not(wait_set_data)) { return true; } } @@ -3307,7 +3307,7 @@ static bool check_and_attach_condition( if (event_data != nullptr) { if (event_data->queue_has_data_and_attach_condition_if_not( zenoh_event_it->second, - &wait_set_data->condition_variable)) + wait_set_data)) { return true; } @@ -3327,9 +3327,7 @@ static bool check_and_attach_condition( if (sub_data == nullptr) { continue; } - if (sub_data->queue_has_data_and_attach_condition_if_not( - &wait_set_data->condition_variable)) - { + if (sub_data->queue_has_data_and_attach_condition_if_not(wait_set_data)) { return true; } } @@ -3341,9 +3339,7 @@ static bool check_and_attach_condition( if (serv_data == nullptr) { continue; } - if (serv_data->queue_has_data_and_attach_condition_if_not( - &wait_set_data->condition_variable)) - { + if (serv_data->queue_has_data_and_attach_condition_if_not(wait_set_data)) { return true; } } @@ -3356,9 +3352,7 @@ static bool check_and_attach_condition( if (client_data == nullptr) { continue; } - if (client_data->queue_has_data_and_attach_condition_if_not( - &wait_set_data->condition_variable)) - { + if (client_data->queue_has_data_and_attach_condition_if_not(wait_set_data)) { return true; } } @@ -3416,13 +3410,22 @@ rmw_wait( // "wait forever", if it specified as 0 it means "never wait", and if it is anything else wait // for that amount of time. if (wait_timeout == nullptr) { - wait_set_data->condition_variable.wait(lock); + wait_set_data->condition_variable.wait( + lock, [wait_set_data]() { + return wait_set_data->triggered; + }); } else { if (wait_timeout->sec != 0 || wait_timeout->nsec != 0) { wait_set_data->condition_variable.wait_for( - lock, std::chrono::nanoseconds(wait_timeout->nsec + RCUTILS_S_TO_NS(wait_timeout->sec))); + lock, + std::chrono::nanoseconds(wait_timeout->nsec + RCUTILS_S_TO_NS(wait_timeout->sec)), + [wait_set_data]() {return wait_set_data->triggered;}); } } + + // We reset here (while still holding the lock) because we only cared about this through this + // particular call. + wait_set_data->triggered = false; } bool wait_result = false; @@ -3436,7 +3439,7 @@ rmw_wait( if (gc == nullptr) { continue; } - if (!gc->detach_condition_and_trigger_set()) { + if (!gc->detach_condition_and_is_trigger_set()) { // Setting to nullptr lets rcl know that this guard condition is not ready guard_conditions->guard_conditions[i] = nullptr; } else {