From 6df0e281cee7ea888485a63faf41b788078e8b4f Mon Sep 17 00:00:00 2001
From: Chris Lalancette <clalancette@gmail.com>
Date: Mon, 24 Jun 2024 16:05:35 -0400
Subject: [PATCH] Improve performance for waits on subs, clients, services, and
 guards. (#207)

* Improve performance for waits on subs, clients, services, and guards.

Basically by combining the check/attach and the detach/check,
we can go from locking 4 times per entity to locking twice
per entity, which is a large savings here.

Signed-off-by: Chris Lalancette <clalancette@gmail.com>
Co-authored-by: Yadu <yadunund@intrinsic.ai>
---
 rmw_zenoh_cpp/src/detail/event.cpp           |  34 ++---
 rmw_zenoh_cpp/src/detail/event.hpp           |   8 +-
 rmw_zenoh_cpp/src/detail/guard_condition.cpp |  20 +--
 rmw_zenoh_cpp/src/detail/guard_condition.hpp |   8 +-
 rmw_zenoh_cpp/src/detail/rmw_data_types.cpp  |  55 ++++----
 rmw_zenoh_cpp/src/detail/rmw_data_types.hpp  |  18 +--
 rmw_zenoh_cpp/src/rmw_zenoh.cpp              | 129 +++++++++----------
 7 files changed, 120 insertions(+), 152 deletions(-)

diff --git a/rmw_zenoh_cpp/src/detail/event.cpp b/rmw_zenoh_cpp/src/detail/event.cpp
index b40f7048..10d7a618 100644
--- a/rmw_zenoh_cpp/src/detail/event.cpp
+++ b/rmw_zenoh_cpp/src/detail/event.cpp
@@ -103,22 +103,6 @@ void EventsManager::trigger_event_callback(rmw_zenoh_event_type_t event_id)
   return;
 }
 
-///=============================================================================
-bool EventsManager::event_queue_is_empty(rmw_zenoh_event_type_t event_id) const
-{
-  if (event_id > ZENOH_EVENT_ID_MAX) {
-    RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
-      "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
-      "Report this bug.",
-      event_id);
-    return true;
-  }
-
-  std::lock_guard<std::mutex> lock(event_mutex_);
-
-  return event_queues_[event_id].empty();
-}
-
 ///=============================================================================
 std::unique_ptr<rmw_zenoh_event_status_t> EventsManager::pop_next_event(
   rmw_zenoh_event_type_t event_id)
@@ -183,7 +167,7 @@ void EventsManager::add_new_event(
 }
 
 ///=============================================================================
-void EventsManager::attach_event_condition(
+bool EventsManager::queue_has_data_and_attach_condition_if_not(
   rmw_zenoh_event_type_t event_id,
   std::condition_variable * condition_variable)
 {
@@ -192,26 +176,36 @@ void EventsManager::attach_event_condition(
       "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
       "Report this bug.",
       event_id);
-    return;
+    return false;
   }
 
   std::lock_guard<std::mutex> lock(event_condition_mutex_);
+
+  if (!event_queues_[event_id].empty()) {
+    return true;
+  }
+
   event_conditions_[event_id] = condition_variable;
+
+  return false;
 }
 
 ///=============================================================================
-void EventsManager::detach_event_condition(rmw_zenoh_event_type_t event_id)
+bool EventsManager::detach_condition_and_event_queue_is_empty(rmw_zenoh_event_type_t event_id)
 {
   if (event_id > ZENOH_EVENT_ID_MAX) {
     RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
       "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
       "Report this bug.",
       event_id);
-    return;
+    return true;
   }
 
   std::lock_guard<std::mutex> lock(event_condition_mutex_);
+
   event_conditions_[event_id] = nullptr;
+
+  return event_queues_[event_id].empty();
 }
 
 ///=============================================================================
diff --git a/rmw_zenoh_cpp/src/detail/event.hpp b/rmw_zenoh_cpp/src/detail/event.hpp
index ca18ca30..25f5dfe9 100644
--- a/rmw_zenoh_cpp/src/detail/event.hpp
+++ b/rmw_zenoh_cpp/src/detail/event.hpp
@@ -120,10 +120,6 @@ class EventsManager
     rmw_event_callback_t callback,
     const void * user_data);
 
-  /// @brief  Returns true if the event queue is empty.
-  /// @param event_id the event id whose event queue should be checked.
-  bool event_queue_is_empty(rmw_zenoh_event_type_t event_id) const;
-
   /// Pop the next event in the queue.
   /// @param event_id the event id whose queue should be popped.
   std::unique_ptr<rmw_zenoh_event_status_t> pop_next_event(
@@ -137,12 +133,12 @@ class EventsManager
 
   /// @brief Attach the condition variable provided by rmw_wait.
   /// @param condition_variable to attach.
-  void attach_event_condition(
+  bool queue_has_data_and_attach_condition_if_not(
     rmw_zenoh_event_type_t event_id,
     std::condition_variable * condition_variable);
 
   /// @brief Detach the condition variable provided by rmw_wait.
-  void detach_event_condition(rmw_zenoh_event_type_t event_id);
+  bool detach_condition_and_event_queue_is_empty(rmw_zenoh_event_type_t event_id);
 
 private:
   /// @brief Trigger the callback for an event.
diff --git a/rmw_zenoh_cpp/src/detail/guard_condition.cpp b/rmw_zenoh_cpp/src/detail/guard_condition.cpp
index 76f1c1f8..b9f78740 100644
--- a/rmw_zenoh_cpp/src/detail/guard_condition.cpp
+++ b/rmw_zenoh_cpp/src/detail/guard_condition.cpp
@@ -40,29 +40,23 @@ void GuardCondition::trigger()
 }
 
 ///=============================================================================
-void GuardCondition::attach_condition(std::condition_variable * condition_variable)
+bool GuardCondition::check_and_attach_condition_if_not(std::condition_variable * condition_variable)
 {
   std::lock_guard<std::mutex> lock(internal_mutex_);
+  if (has_triggered_) {
+    return true;
+  }
   condition_variable_ = condition_variable;
+
+  return false;
 }
 
 ///=============================================================================
-void GuardCondition::detach_condition()
+bool GuardCondition::detach_condition_and_trigger_set()
 {
   std::lock_guard<std::mutex> lock(internal_mutex_);
   condition_variable_ = nullptr;
-}
-
-bool GuardCondition::has_triggered() const
-{
-  std::lock_guard<std::mutex> lock(internal_mutex_);
-  return has_triggered_;
-}
 
-///=============================================================================
-bool GuardCondition::get_and_reset_trigger()
-{
-  std::lock_guard<std::mutex> lock(internal_mutex_);
   bool ret = has_triggered_;
 
   has_triggered_ = false;
diff --git a/rmw_zenoh_cpp/src/detail/guard_condition.hpp b/rmw_zenoh_cpp/src/detail/guard_condition.hpp
index 9adb737e..96e3b3bd 100644
--- a/rmw_zenoh_cpp/src/detail/guard_condition.hpp
+++ b/rmw_zenoh_cpp/src/detail/guard_condition.hpp
@@ -31,13 +31,9 @@ class GuardCondition final
   // Sets has_triggered_ to true and calls notify_one() on condition_variable_ if set.
   void trigger();
 
-  void attach_condition(std::condition_variable * condition_variable);
+  bool check_and_attach_condition_if_not(std::condition_variable * condition_variable);
 
-  void detach_condition();
-
-  bool has_triggered() const;
-
-  bool get_and_reset_trigger();
+  bool detach_condition_and_trigger_set();
 
 private:
   mutable std::mutex internal_mutex_;
diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
index 0f14b9af..4445d89e 100644
--- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
+++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
@@ -83,10 +83,17 @@ size_t rmw_publisher_data_t::get_next_sequence_number()
 }
 
 ///=============================================================================
-void rmw_subscription_data_t::attach_condition(std::condition_variable * condition_variable)
+bool rmw_subscription_data_t::queue_has_data_and_attach_condition_if_not(
+  std::condition_variable * condition_variable)
 {
   std::lock_guard<std::mutex> lock(condition_mutex_);
+  if (!message_queue_.empty()) {
+    return true;
+  }
+
   condition_ = condition_variable;
+
+  return false;
 }
 
 ///=============================================================================
@@ -99,16 +106,11 @@ void rmw_subscription_data_t::notify()
 }
 
 ///=============================================================================
-void rmw_subscription_data_t::detach_condition()
+bool rmw_subscription_data_t::detach_condition_and_queue_is_empty()
 {
   std::lock_guard<std::mutex> lock(condition_mutex_);
   condition_ = nullptr;
-}
 
-///=============================================================================
-bool rmw_subscription_data_t::message_queue_is_empty() const
-{
-  std::lock_guard<std::mutex> lock(message_queue_mutex_);
   return message_queue_.empty();
 }
 
@@ -180,24 +182,25 @@ void rmw_subscription_data_t::add_new_message(
 }
 
 ///=============================================================================
-bool rmw_service_data_t::query_queue_is_empty() const
-{
-  std::lock_guard<std::mutex> lock(query_queue_mutex_);
-  return query_queue_.empty();
-}
-
-///=============================================================================
-void rmw_service_data_t::attach_condition(std::condition_variable * condition_variable)
+bool rmw_service_data_t::queue_has_data_and_attach_condition_if_not(
+  std::condition_variable * condition_variable)
 {
   std::lock_guard<std::mutex> lock(condition_mutex_);
+  if (!query_queue_.empty()) {
+    return true;
+  }
   condition_ = condition_variable;
+
+  return false;
 }
 
 ///=============================================================================
-void rmw_service_data_t::detach_condition()
+bool rmw_service_data_t::detach_condition_and_queue_is_empty()
 {
   std::lock_guard<std::mutex> lock(condition_mutex_);
   condition_ = nullptr;
+
+  return query_queue_.empty();
 }
 
 ///=============================================================================
@@ -338,25 +341,25 @@ void rmw_client_data_t::add_new_reply(std::unique_ptr<ZenohReply> reply)
 }
 
 ///=============================================================================
-bool rmw_client_data_t::reply_queue_is_empty() const
-{
-  std::lock_guard<std::mutex> lock(reply_queue_mutex_);
-
-  return reply_queue_.empty();
-}
-
-///=============================================================================
-void rmw_client_data_t::attach_condition(std::condition_variable * condition_variable)
+bool rmw_client_data_t::queue_has_data_and_attach_condition_if_not(
+  std::condition_variable * condition_variable)
 {
   std::lock_guard<std::mutex> lock(condition_mutex_);
+  if (!reply_queue_.empty()) {
+    return true;
+  }
   condition_ = condition_variable;
+
+  return false;
 }
 
 ///=============================================================================
-void rmw_client_data_t::detach_condition()
+bool rmw_client_data_t::detach_condition_and_queue_is_empty()
 {
   std::lock_guard<std::mutex> lock(condition_mutex_);
   condition_ = nullptr;
+
+  return reply_queue_.empty();
 }
 
 ///=============================================================================
diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
index 1ae8b736..bfdff21b 100644
--- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
+++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
@@ -181,11 +181,9 @@ class rmw_subscription_data_t final
   MessageTypeSupport * type_support;
   rmw_context_t * context;
 
-  void attach_condition(std::condition_variable * condition_variable);
+  bool queue_has_data_and_attach_condition_if_not(std::condition_variable * condition_variable);
 
-  void detach_condition();
-
-  bool message_queue_is_empty() const;
+  bool detach_condition_and_queue_is_empty();
 
   std::unique_ptr<saved_msg_data> pop_next_message();
 
@@ -255,11 +253,9 @@ class rmw_service_data_t final
 
   rmw_context_t * context;
 
-  bool query_queue_is_empty() const;
-
-  void attach_condition(std::condition_variable * condition_variable);
+  bool queue_has_data_and_attach_condition_if_not(std::condition_variable * condition_variable);
 
-  void detach_condition();
+  bool detach_condition_and_queue_is_empty();
 
   std::unique_ptr<ZenohQuery> pop_next_query();
 
@@ -333,11 +329,9 @@ class rmw_client_data_t final
 
   void add_new_reply(std::unique_ptr<rmw_zenoh_cpp::ZenohReply> reply);
 
-  bool reply_queue_is_empty() const;
-
-  void attach_condition(std::condition_variable * condition_variable);
+  bool queue_has_data_and_attach_condition_if_not(std::condition_variable * condition_variable);
 
-  void detach_condition();
+  bool detach_condition_and_queue_is_empty();
 
   std::unique_ptr<rmw_zenoh_cpp::ZenohReply> pop_next_reply();
 
diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp
index 0356be80..c57f73a7 100644
--- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp
+++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp
@@ -3277,12 +3277,11 @@ static bool check_and_attach_condition(
     for (size_t i = 0; i < guard_conditions->guard_condition_count; ++i) {
       rmw_zenoh_cpp::GuardCondition * gc =
         static_cast<rmw_zenoh_cpp::GuardCondition *>(guard_conditions->guard_conditions[i]);
-      if (gc != nullptr) {
-        if (gc->has_triggered()) {
-          return true;
-        }
-
-        gc->attach_condition(&wait_set_data->condition_variable);
+      if (gc == nullptr) {
+        continue;
+      }
+      if (gc->check_and_attach_condition_if_not(&wait_set_data->condition_variable)) {
+        return true;
       }
     }
   }
@@ -3294,13 +3293,12 @@ static bool check_and_attach_condition(
       if (zenoh_event_it != rmw_zenoh_cpp::event_map.end()) {
         auto event_data = static_cast<rmw_zenoh_cpp::EventsManager *>(event->data);
         if (event_data != nullptr) {
-          if (!event_data->event_queue_is_empty(zenoh_event_it->second)) {
+          if (event_data->queue_has_data_and_attach_condition_if_not(
+              zenoh_event_it->second,
+              &wait_set_data->condition_variable))
+          {
             return true;
           }
-
-          event_data->attach_event_condition(
-            zenoh_event_it->second,
-            &wait_set_data->condition_variable);
         }
       } else {
         RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
@@ -3314,12 +3312,13 @@ static bool check_and_attach_condition(
     for (size_t i = 0; i < subscriptions->subscriber_count; ++i) {
       auto sub_data =
         static_cast<rmw_zenoh_cpp::rmw_subscription_data_t *>(subscriptions->subscribers[i]);
-      if (sub_data != nullptr) {
-        if (!sub_data->message_queue_is_empty()) {
-          return true;
-        }
-
-        sub_data->attach_condition(&wait_set_data->condition_variable);
+      if (sub_data == nullptr) {
+        continue;
+      }
+      if (sub_data->queue_has_data_and_attach_condition_if_not(
+          &wait_set_data->condition_variable))
+      {
+        return true;
       }
     }
   }
@@ -3327,12 +3326,13 @@ static bool check_and_attach_condition(
   if (services) {
     for (size_t i = 0; i < services->service_count; ++i) {
       auto serv_data = static_cast<rmw_zenoh_cpp::rmw_service_data_t *>(services->services[i]);
-      if (serv_data != nullptr) {
-        if (!serv_data->query_queue_is_empty()) {
-          return true;
-        }
-
-        serv_data->attach_condition(&wait_set_data->condition_variable);
+      if (serv_data == nullptr) {
+        continue;
+      }
+      if (serv_data->queue_has_data_and_attach_condition_if_not(
+          &wait_set_data->condition_variable))
+      {
+        return true;
       }
     }
   }
@@ -3341,12 +3341,13 @@ static bool check_and_attach_condition(
     for (size_t i = 0; i < clients->client_count; ++i) {
       rmw_zenoh_cpp::rmw_client_data_t * client_data =
         static_cast<rmw_zenoh_cpp::rmw_client_data_t *>(clients->clients[i]);
-      if (client_data != nullptr) {
-        if (!client_data->reply_queue_is_empty()) {
-          return true;
-        }
-
-        client_data->attach_condition(&wait_set_data->condition_variable);
+      if (client_data == nullptr) {
+        continue;
+      }
+      if (client_data->queue_has_data_and_attach_condition_if_not(
+          &wait_set_data->condition_variable))
+      {
+        return true;
       }
     }
   }
@@ -3415,34 +3416,30 @@ rmw_wait(
     }
   }
 
+  // According to the documentation for rmw_wait in rmw.h, entries in the various arrays that have
+  // *not* been triggered should be set to NULL
   if (guard_conditions) {
-    // Now detach the condition variable and mutex from each of the guard conditions
     for (size_t i = 0; i < guard_conditions->guard_condition_count; ++i) {
       rmw_zenoh_cpp::GuardCondition * gc =
         static_cast<rmw_zenoh_cpp::GuardCondition *>(guard_conditions->guard_conditions[i]);
-      if (gc != nullptr) {
-        gc->detach_condition();
-        // According to the documentation for rmw_wait in rmw.h, entries in the
-        // array that have *not* been triggered should be set to NULL
-        if (!gc->get_and_reset_trigger()) {
-          guard_conditions->guard_conditions[i] = nullptr;
-        }
+      if (gc == nullptr) {
+        continue;
+      }
+      if (!gc->detach_condition_and_trigger_set()) {
+        // Setting to nullptr lets rcl know that this guard condition is not ready
+        guard_conditions->guard_conditions[i] = nullptr;
       }
     }
   }
 
   if (events) {
-    // Now detach the condition variable and mutex from each of the subscriptions
     for (size_t i = 0; i < events->event_count; ++i) {
       auto event = static_cast<rmw_event_t *>(events->events[i]);
       auto event_data = static_cast<rmw_zenoh_cpp::EventsManager *>(event->data);
       if (event_data != nullptr) {
         auto zenoh_event_it = rmw_zenoh_cpp::event_map.find(event->event_type);
         if (zenoh_event_it != rmw_zenoh_cpp::event_map.end()) {
-          event_data->detach_event_condition(zenoh_event_it->second);
-          // According to the documentation for rmw_wait in rmw.h, entries in the
-          // array that have *not* been triggered should be set to NULL
-          if (event_data->event_queue_is_empty(zenoh_event_it->second)) {
+          if (event_data->detach_condition_and_event_queue_is_empty(zenoh_event_it->second)) {
             // Setting to nullptr lets rcl know that this subscription is not ready
             events->events[i] = nullptr;
           }
@@ -3452,51 +3449,45 @@ rmw_wait(
   }
 
   if (subscriptions) {
-    // Now detach the condition variable and mutex from each of the subscriptions
     for (size_t i = 0; i < subscriptions->subscriber_count; ++i) {
       auto sub_data =
         static_cast<rmw_zenoh_cpp::rmw_subscription_data_t *>(subscriptions->subscribers[i]);
-      if (sub_data != nullptr) {
-        sub_data->detach_condition();
-        // According to the documentation for rmw_wait in rmw.h, entries in the
-        // array that have *not* been triggered should be set to NULL
-        if (sub_data->message_queue_is_empty()) {
-          // Setting to nullptr lets rcl know that this subscription is not ready
-          subscriptions->subscribers[i] = nullptr;
-        }
+      if (sub_data == nullptr) {
+        continue;
+      }
+
+      if (sub_data->detach_condition_and_queue_is_empty()) {
+        // Setting to nullptr lets rcl know that this subscription is not ready
+        subscriptions->subscribers[i] = nullptr;
       }
     }
   }
 
   if (services) {
-    // Now detach the condition variable and mutex from each of the services
     for (size_t i = 0; i < services->service_count; ++i) {
       auto serv_data = static_cast<rmw_zenoh_cpp::rmw_service_data_t *>(services->services[i]);
-      if (serv_data != nullptr) {
-        serv_data->detach_condition();
-        // According to the documentation for rmw_wait in rmw.h, entries in the
-        // array that have *not* been triggered should be set to NULL
-        if (serv_data->query_queue_is_empty()) {
-          // Setting to nullptr lets rcl know that this service is not ready
-          services->services[i] = nullptr;
-        }
+      if (serv_data == nullptr) {
+        continue;
+      }
+
+      if (serv_data->detach_condition_and_queue_is_empty()) {
+        // Setting to nullptr lets rcl know that this service is not ready
+        services->services[i] = nullptr;
       }
     }
   }
 
   if (clients) {
-    // Now detach the condition variable and mutex from each of the clients
     for (size_t i = 0; i < clients->client_count; ++i) {
       rmw_zenoh_cpp::rmw_client_data_t * client_data =
         static_cast<rmw_zenoh_cpp::rmw_client_data_t *>(clients->clients[i]);
-      if (client_data != nullptr) {
-        client_data->detach_condition();
-        // According to the documentation for rmw_wait in rmw.h, entries in the
-        // array that have *not* been triggered should be set to NULL
-        if (client_data->reply_queue_is_empty()) {
-          // Setting to nullptr lets rcl know that this client is not ready
-          clients->clients[i] = nullptr;
-        }
+      if (client_data == nullptr) {
+        continue;
+      }
+
+      if (client_data->detach_condition_and_queue_is_empty()) {
+        // Setting to nullptr lets rcl know that this client is not ready
+        clients->clients[i] = nullptr;
       }
     }
   }