Skip to content

Commit

Permalink
Fix a race condition in rmw_wait.
Browse files Browse the repository at this point in the history
In the current code, there is a race condition in rmw_wait.
This happens because of the following sequence:

1.  Without taking a lock, we check if any of the entities in the
wait_set are ready, and if not attach the condition_variable to it.
2.  Then we take the lock, and go to sleep on the condition_variable.

However, doing step 1 takes time, and we check in a particular order:
guard_conditions, events, subscriptions, services, clients.  The race
happens because a subscription may come in after we've checked
subscriptions in the above list (while we are checking services and
clients).  In that case, we'll unnecessarily go to sleep on the
condition_variable, even though something is ready.  This increases the
latency.

To solve the issue, we still do all of the checking and attaching
unlocked.  However, we update the "notify" method of each entity so that
it takes the condition_variable lock, and in addition to kicking the
condition_variable it sets a boolean in the wait_set structure to
"true".  Then, in rmw_wait(), after we have finished attaching, we take
the lock, and set a predicate on the condition_variable so that it will
quit if wait_set_data->triggered is true.  Thus, if one of the entities
became ready after we checked, we'll notice it and not go to sleep.

This also allows us to deal with "spurious" wakeups, where the
condition_variable was woken up even though nothing in this wait_set
became ready.

Signed-off-by: Chris Lalancette <[email protected]>
  • Loading branch information
clalancette committed Jun 25, 2024
1 parent 61f2a70 commit f9ee9ed
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 61 deletions.
12 changes: 7 additions & 5 deletions rmw_zenoh_cpp/src/detail/event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ void EventsManager::add_new_event(
///=============================================================================
bool EventsManager::queue_has_data_and_attach_condition_if_not(
rmw_zenoh_event_type_t event_id,
std::condition_variable * condition_variable)
rmw_wait_set_data_t * wait_set_data)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
Expand All @@ -185,7 +185,7 @@ bool EventsManager::queue_has_data_and_attach_condition_if_not(
return true;
}

event_conditions_[event_id] = condition_variable;
wait_set_data_[event_id] = wait_set_data;

return false;
}
Expand All @@ -203,7 +203,7 @@ bool EventsManager::detach_condition_and_event_queue_is_empty(rmw_zenoh_event_ty

std::lock_guard<std::mutex> lock(event_condition_mutex_);

event_conditions_[event_id] = nullptr;
wait_set_data_[event_id] = nullptr;

return event_queues_[event_id].empty();
}
Expand All @@ -220,8 +220,10 @@ void EventsManager::notify_event(rmw_zenoh_event_type_t event_id)
}

std::lock_guard<std::mutex> lock(event_condition_mutex_);
if (event_conditions_[event_id] != nullptr) {
event_conditions_[event_id]->notify_one();
if (wait_set_data_[event_id] != nullptr) {
std::lock_guard<std::mutex> wait_set_lock(wait_set_data_[event_id]->condition_mutex);
wait_set_data_[event_id]->triggered = true;
wait_set_data_[event_id]->condition_variable.notify_one();
}
}
} // namespace rmw_zenoh_cpp
6 changes: 4 additions & 2 deletions rmw_zenoh_cpp/src/detail/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include "rmw/event.h"
#include "rmw/event_callback_type.h"

#include "rmw_wait_set_data.hpp"

namespace rmw_zenoh_cpp
{
///=============================================================================
Expand Down Expand Up @@ -135,7 +137,7 @@ class EventsManager
/// @param condition_variable to attach.
bool queue_has_data_and_attach_condition_if_not(
rmw_zenoh_event_type_t event_id,
std::condition_variable * condition_variable);
rmw_wait_set_data_t * wait_set_data);

/// @brief Detach the condition variable provided by rmw_wait.
bool detach_condition_and_event_queue_is_empty(rmw_zenoh_event_type_t event_id);
Expand All @@ -153,7 +155,7 @@ class EventsManager
/// Mutex to lock for event_condition.
mutable std::mutex event_condition_mutex_;
/// Condition variable to attach for event notifications.
std::condition_variable * event_conditions_[ZENOH_EVENT_ID_MAX + 1]{nullptr};
rmw_wait_set_data_t * wait_set_data_[ZENOH_EVENT_ID_MAX + 1]{nullptr};

rmw_event_callback_t event_callback_[ZENOH_EVENT_ID_MAX + 1] {nullptr};
const void * event_data_[ZENOH_EVENT_ID_MAX + 1] {nullptr};
Expand Down
20 changes: 13 additions & 7 deletions rmw_zenoh_cpp/src/detail/guard_condition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <mutex>

#include "guard_condition.hpp"
#include "rmw_wait_set_data.hpp"

namespace rmw_zenoh_cpp
{
///=============================================================================
GuardCondition::GuardCondition()
: has_triggered_(false),
condition_variable_(nullptr)
wait_set_data_(nullptr)
{
}

Expand All @@ -34,28 +37,31 @@ void GuardCondition::trigger()
// be called
has_triggered_ = true;

if (condition_variable_ != nullptr) {
condition_variable_->notify_one();
if (wait_set_data_ != nullptr) {
std::lock_guard<std::mutex> wait_set_lock(wait_set_data_->condition_mutex);
wait_set_data_->triggered = true;
wait_set_data_->condition_variable.notify_one();
}
}

///=============================================================================
bool GuardCondition::check_and_attach_condition_if_not(std::condition_variable * condition_variable)
bool GuardCondition::check_and_attach_condition_if_not(rmw_wait_set_data_t * wait_set_data)
{
std::lock_guard<std::mutex> lock(internal_mutex_);
if (has_triggered_) {
return true;
}
condition_variable_ = condition_variable;

wait_set_data_ = wait_set_data;

return false;
}

///=============================================================================
bool GuardCondition::detach_condition_and_trigger_set()
bool GuardCondition::detach_condition_and_is_trigger_set()
{
std::lock_guard<std::mutex> lock(internal_mutex_);
condition_variable_ = nullptr;
wait_set_data_ = nullptr;

bool ret = has_triggered_;

Expand Down
8 changes: 5 additions & 3 deletions rmw_zenoh_cpp/src/detail/guard_condition.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <condition_variable>
#include <mutex>

#include "rmw_wait_set_data.hpp"

namespace rmw_zenoh_cpp
{
///=============================================================================
Expand All @@ -31,14 +33,14 @@ class GuardCondition final
// Sets has_triggered_ to true and calls notify_one() on condition_variable_ if set.
void trigger();

bool check_and_attach_condition_if_not(std::condition_variable * condition_variable);
bool check_and_attach_condition_if_not(rmw_wait_set_data_t * wait_set_data);

bool detach_condition_and_trigger_set();
bool detach_condition_and_is_trigger_set();

private:
mutable std::mutex internal_mutex_;
std::atomic_bool has_triggered_;
std::condition_variable * condition_variable_;
rmw_wait_set_data_t * wait_set_data_;
};
} // namespace rmw_zenoh_cpp

Expand Down
36 changes: 21 additions & 15 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,14 @@ size_t rmw_publisher_data_t::get_next_sequence_number()

///=============================================================================
bool rmw_subscription_data_t::queue_has_data_and_attach_condition_if_not(
std::condition_variable * condition_variable)
rmw_wait_set_data_t * wait_set_data)
{
std::lock_guard<std::mutex> lock(condition_mutex_);
if (!message_queue_.empty()) {
return true;
}

condition_ = condition_variable;
wait_set_data_ = wait_set_data;

return false;
}
Expand All @@ -100,16 +100,18 @@ bool rmw_subscription_data_t::queue_has_data_and_attach_condition_if_not(
void rmw_subscription_data_t::notify()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
if (condition_ != nullptr) {
condition_->notify_one();
if (wait_set_data_ != nullptr) {
std::lock_guard<std::mutex> wait_set_lock(wait_set_data_->condition_mutex);
wait_set_data_->triggered = true;
wait_set_data_->condition_variable.notify_one();
}
}

///=============================================================================
bool rmw_subscription_data_t::detach_condition_and_queue_is_empty()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
condition_ = nullptr;
wait_set_data_ = nullptr;

return message_queue_.empty();
}
Expand Down Expand Up @@ -183,13 +185,13 @@ void rmw_subscription_data_t::add_new_message(

///=============================================================================
bool rmw_service_data_t::queue_has_data_and_attach_condition_if_not(
std::condition_variable * condition_variable)
rmw_wait_set_data_t * wait_set_data)
{
std::lock_guard<std::mutex> lock(condition_mutex_);
if (!query_queue_.empty()) {
return true;
}
condition_ = condition_variable;
wait_set_data_ = wait_set_data;

return false;
}
Expand All @@ -198,7 +200,7 @@ bool rmw_service_data_t::queue_has_data_and_attach_condition_if_not(
bool rmw_service_data_t::detach_condition_and_queue_is_empty()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
condition_ = nullptr;
wait_set_data_ = nullptr;

return query_queue_.empty();
}
Expand All @@ -221,8 +223,10 @@ std::unique_ptr<ZenohQuery> rmw_service_data_t::pop_next_query()
void rmw_service_data_t::notify()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
if (condition_ != nullptr) {
condition_->notify_one();
if (wait_set_data_ != nullptr) {
std::lock_guard<std::mutex> wait_set_lock(wait_set_data_->condition_mutex);
wait_set_data_->triggered = true;
wait_set_data_->condition_variable.notify_one();
}
}

Expand Down Expand Up @@ -312,8 +316,10 @@ std::unique_ptr<ZenohQuery> rmw_service_data_t::take_from_query_map(
void rmw_client_data_t::notify()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
if (condition_ != nullptr) {
condition_->notify_one();
if (wait_set_data_ != nullptr) {
std::lock_guard<std::mutex> wait_set_lock(wait_set_data_->condition_mutex);
wait_set_data_->triggered = true;
wait_set_data_->condition_variable.notify_one();
}
}

Expand Down Expand Up @@ -342,13 +348,13 @@ void rmw_client_data_t::add_new_reply(std::unique_ptr<ZenohReply> reply)

///=============================================================================
bool rmw_client_data_t::queue_has_data_and_attach_condition_if_not(
std::condition_variable * condition_variable)
rmw_wait_set_data_t * wait_set_data)
{
std::lock_guard<std::mutex> lock(condition_mutex_);
if (!reply_queue_.empty()) {
return true;
}
condition_ = condition_variable;
wait_set_data_ = wait_set_data;

return false;
}
Expand All @@ -357,7 +363,7 @@ bool rmw_client_data_t::queue_has_data_and_attach_condition_if_not(
bool rmw_client_data_t::detach_condition_and_queue_is_empty()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
condition_ = nullptr;
wait_set_data_ = nullptr;

return reply_queue_.empty();
}
Expand Down
22 changes: 7 additions & 15 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "event.hpp"
#include "graph_cache.hpp"
#include "message_type_support.hpp"
#include "rmw_wait_set_data.hpp"
#include "service_type_support.hpp"

/// Structs for various type erased data fields.
Expand Down Expand Up @@ -128,15 +129,6 @@ class rmw_publisher_data_t final
size_t sequence_number_{1};
};

///=============================================================================
struct rmw_wait_set_data_t
{
std::condition_variable condition_variable;
std::mutex condition_mutex;

rmw_context_t * context;
};

///=============================================================================
// z_owned_closure_sample_t
void sub_data_handler(const z_sample_t * sample, void * sub_data);
Expand Down Expand Up @@ -181,7 +173,7 @@ class rmw_subscription_data_t final
MessageTypeSupport * type_support;
rmw_context_t * context;

bool queue_has_data_and_attach_condition_if_not(std::condition_variable * condition_variable);
bool queue_has_data_and_attach_condition_if_not(rmw_wait_set_data_t * wait_set_data);

bool detach_condition_and_queue_is_empty();

Expand All @@ -202,7 +194,7 @@ class rmw_subscription_data_t final

void notify();

std::condition_variable * condition_{nullptr};
rmw_wait_set_data_t * wait_set_data_{nullptr};
std::mutex condition_mutex_;
};

Expand Down Expand Up @@ -253,7 +245,7 @@ class rmw_service_data_t final

rmw_context_t * context;

bool queue_has_data_and_attach_condition_if_not(std::condition_variable * condition_variable);
bool queue_has_data_and_attach_condition_if_not(rmw_wait_set_data_t * wait_set_data);

bool detach_condition_and_queue_is_empty();

Expand All @@ -279,7 +271,7 @@ class rmw_service_data_t final
std::unordered_map<size_t, SequenceToQuery> sequence_to_query_map_;
std::mutex sequence_to_query_map_mutex_;

std::condition_variable * condition_{nullptr};
rmw_wait_set_data_t * wait_set_data_{nullptr};
std::mutex condition_mutex_;
};

Expand Down Expand Up @@ -329,7 +321,7 @@ class rmw_client_data_t final

void add_new_reply(std::unique_ptr<rmw_zenoh_cpp::ZenohReply> reply);

bool queue_has_data_and_attach_condition_if_not(std::condition_variable * condition_variable);
bool queue_has_data_and_attach_condition_if_not(rmw_wait_set_data_t * wait_set_data);

bool detach_condition_and_queue_is_empty();

Expand All @@ -343,7 +335,7 @@ class rmw_client_data_t final
size_t sequence_number_{1};
std::mutex sequence_number_mutex_;

std::condition_variable * condition_{nullptr};
rmw_wait_set_data_t * wait_set_data_{nullptr};
std::mutex condition_mutex_;

std::deque<std::unique_ptr<rmw_zenoh_cpp::ZenohReply>> reply_queue_;
Expand Down
37 changes: 37 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_wait_set_data.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2023 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef DETAIL__RMW_WAIT_SET_DATA_HPP_
#define DETAIL__RMW_WAIT_SET_DATA_HPP_

#include <condition_variable>
#include <mutex>

#include "rmw/rmw.h"

namespace rmw_zenoh_cpp
{

struct rmw_wait_set_data_t
{
std::condition_variable condition_variable;
std::mutex condition_mutex;

bool triggered{false};

rmw_context_t * context;
};
} // namespace rmw_zenoh_cpp

#endif // DETAIL__RMW_WAIT_SET_DATA_HPP_
Loading

0 comments on commit f9ee9ed

Please sign in to comment.