Skip to content

Commit

Permalink
Improve performance for waits on subs, clients, services, and guards. (
Browse files Browse the repository at this point in the history
…#207)

* Improve performance for waits on subs, clients, services, and guards.

Basically by combining the check/attach and the detach/check,
we can go from locking 4 times per entity to locking twice
per entity, which is a large savings here.

Signed-off-by: Chris Lalancette <[email protected]>
Co-authored-by: Yadu <[email protected]>
  • Loading branch information
clalancette and Yadunund authored Jun 24, 2024
1 parent 08704af commit 6df0e28
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 152 deletions.
34 changes: 14 additions & 20 deletions rmw_zenoh_cpp/src/detail/event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,22 +103,6 @@ void EventsManager::trigger_event_callback(rmw_zenoh_event_type_t event_id)
return;
}

///=============================================================================
bool EventsManager::event_queue_is_empty(rmw_zenoh_event_type_t event_id) const
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return true;
}

std::lock_guard<std::mutex> lock(event_mutex_);

return event_queues_[event_id].empty();
}

///=============================================================================
std::unique_ptr<rmw_zenoh_event_status_t> EventsManager::pop_next_event(
rmw_zenoh_event_type_t event_id)
Expand Down Expand Up @@ -183,7 +167,7 @@ void EventsManager::add_new_event(
}

///=============================================================================
void EventsManager::attach_event_condition(
bool EventsManager::queue_has_data_and_attach_condition_if_not(
rmw_zenoh_event_type_t event_id,
std::condition_variable * condition_variable)
{
Expand All @@ -192,26 +176,36 @@ void EventsManager::attach_event_condition(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return;
return false;
}

std::lock_guard<std::mutex> lock(event_condition_mutex_);

if (!event_queues_[event_id].empty()) {
return true;
}

event_conditions_[event_id] = condition_variable;

return false;
}

///=============================================================================
void EventsManager::detach_event_condition(rmw_zenoh_event_type_t event_id)
bool EventsManager::detach_condition_and_event_queue_is_empty(rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return;
return true;
}

std::lock_guard<std::mutex> lock(event_condition_mutex_);

event_conditions_[event_id] = nullptr;

return event_queues_[event_id].empty();
}

///=============================================================================
Expand Down
8 changes: 2 additions & 6 deletions rmw_zenoh_cpp/src/detail/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,6 @@ class EventsManager
rmw_event_callback_t callback,
const void * user_data);

/// @brief Returns true if the event queue is empty.
/// @param event_id the event id whose event queue should be checked.
bool event_queue_is_empty(rmw_zenoh_event_type_t event_id) const;

/// Pop the next event in the queue.
/// @param event_id the event id whose queue should be popped.
std::unique_ptr<rmw_zenoh_event_status_t> pop_next_event(
Expand All @@ -137,12 +133,12 @@ class EventsManager

/// @brief Attach the condition variable provided by rmw_wait.
/// @param condition_variable to attach.
void attach_event_condition(
bool queue_has_data_and_attach_condition_if_not(
rmw_zenoh_event_type_t event_id,
std::condition_variable * condition_variable);

/// @brief Detach the condition variable provided by rmw_wait.
void detach_event_condition(rmw_zenoh_event_type_t event_id);
bool detach_condition_and_event_queue_is_empty(rmw_zenoh_event_type_t event_id);

private:
/// @brief Trigger the callback for an event.
Expand Down
20 changes: 7 additions & 13 deletions rmw_zenoh_cpp/src/detail/guard_condition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,29 +40,23 @@ void GuardCondition::trigger()
}

///=============================================================================
void GuardCondition::attach_condition(std::condition_variable * condition_variable)
bool GuardCondition::check_and_attach_condition_if_not(std::condition_variable * condition_variable)
{
std::lock_guard<std::mutex> lock(internal_mutex_);
if (has_triggered_) {
return true;
}
condition_variable_ = condition_variable;

return false;
}

///=============================================================================
void GuardCondition::detach_condition()
bool GuardCondition::detach_condition_and_trigger_set()
{
std::lock_guard<std::mutex> lock(internal_mutex_);
condition_variable_ = nullptr;
}

bool GuardCondition::has_triggered() const
{
std::lock_guard<std::mutex> lock(internal_mutex_);
return has_triggered_;
}

///=============================================================================
bool GuardCondition::get_and_reset_trigger()
{
std::lock_guard<std::mutex> lock(internal_mutex_);
bool ret = has_triggered_;

has_triggered_ = false;
Expand Down
8 changes: 2 additions & 6 deletions rmw_zenoh_cpp/src/detail/guard_condition.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,9 @@ class GuardCondition final
// Sets has_triggered_ to true and calls notify_one() on condition_variable_ if set.
void trigger();

void attach_condition(std::condition_variable * condition_variable);
bool check_and_attach_condition_if_not(std::condition_variable * condition_variable);

void detach_condition();

bool has_triggered() const;

bool get_and_reset_trigger();
bool detach_condition_and_trigger_set();

private:
mutable std::mutex internal_mutex_;
Expand Down
55 changes: 29 additions & 26 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,17 @@ size_t rmw_publisher_data_t::get_next_sequence_number()
}

///=============================================================================
void rmw_subscription_data_t::attach_condition(std::condition_variable * condition_variable)
bool rmw_subscription_data_t::queue_has_data_and_attach_condition_if_not(
std::condition_variable * condition_variable)
{
std::lock_guard<std::mutex> lock(condition_mutex_);
if (!message_queue_.empty()) {
return true;
}

condition_ = condition_variable;

return false;
}

///=============================================================================
Expand All @@ -99,16 +106,11 @@ void rmw_subscription_data_t::notify()
}

///=============================================================================
void rmw_subscription_data_t::detach_condition()
bool rmw_subscription_data_t::detach_condition_and_queue_is_empty()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
condition_ = nullptr;
}

///=============================================================================
bool rmw_subscription_data_t::message_queue_is_empty() const
{
std::lock_guard<std::mutex> lock(message_queue_mutex_);
return message_queue_.empty();
}

Expand Down Expand Up @@ -180,24 +182,25 @@ void rmw_subscription_data_t::add_new_message(
}

///=============================================================================
bool rmw_service_data_t::query_queue_is_empty() const
{
std::lock_guard<std::mutex> lock(query_queue_mutex_);
return query_queue_.empty();
}

///=============================================================================
void rmw_service_data_t::attach_condition(std::condition_variable * condition_variable)
bool rmw_service_data_t::queue_has_data_and_attach_condition_if_not(
std::condition_variable * condition_variable)
{
std::lock_guard<std::mutex> lock(condition_mutex_);
if (!query_queue_.empty()) {
return true;
}
condition_ = condition_variable;

return false;
}

///=============================================================================
void rmw_service_data_t::detach_condition()
bool rmw_service_data_t::detach_condition_and_queue_is_empty()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
condition_ = nullptr;

return query_queue_.empty();
}

///=============================================================================
Expand Down Expand Up @@ -338,25 +341,25 @@ void rmw_client_data_t::add_new_reply(std::unique_ptr<ZenohReply> reply)
}

///=============================================================================
bool rmw_client_data_t::reply_queue_is_empty() const
{
std::lock_guard<std::mutex> lock(reply_queue_mutex_);

return reply_queue_.empty();
}

///=============================================================================
void rmw_client_data_t::attach_condition(std::condition_variable * condition_variable)
bool rmw_client_data_t::queue_has_data_and_attach_condition_if_not(
std::condition_variable * condition_variable)
{
std::lock_guard<std::mutex> lock(condition_mutex_);
if (!reply_queue_.empty()) {
return true;
}
condition_ = condition_variable;

return false;
}

///=============================================================================
void rmw_client_data_t::detach_condition()
bool rmw_client_data_t::detach_condition_and_queue_is_empty()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
condition_ = nullptr;

return reply_queue_.empty();
}

///=============================================================================
Expand Down
18 changes: 6 additions & 12 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,9 @@ class rmw_subscription_data_t final
MessageTypeSupport * type_support;
rmw_context_t * context;

void attach_condition(std::condition_variable * condition_variable);
bool queue_has_data_and_attach_condition_if_not(std::condition_variable * condition_variable);

void detach_condition();

bool message_queue_is_empty() const;
bool detach_condition_and_queue_is_empty();

std::unique_ptr<saved_msg_data> pop_next_message();

Expand Down Expand Up @@ -255,11 +253,9 @@ class rmw_service_data_t final

rmw_context_t * context;

bool query_queue_is_empty() const;

void attach_condition(std::condition_variable * condition_variable);
bool queue_has_data_and_attach_condition_if_not(std::condition_variable * condition_variable);

void detach_condition();
bool detach_condition_and_queue_is_empty();

std::unique_ptr<ZenohQuery> pop_next_query();

Expand Down Expand Up @@ -333,11 +329,9 @@ class rmw_client_data_t final

void add_new_reply(std::unique_ptr<rmw_zenoh_cpp::ZenohReply> reply);

bool reply_queue_is_empty() const;

void attach_condition(std::condition_variable * condition_variable);
bool queue_has_data_and_attach_condition_if_not(std::condition_variable * condition_variable);

void detach_condition();
bool detach_condition_and_queue_is_empty();

std::unique_ptr<rmw_zenoh_cpp::ZenohReply> pop_next_reply();

Expand Down
Loading

0 comments on commit 6df0e28

Please sign in to comment.