From 1d9651a0d705ed1dc818ef70bf13970b6aef079a Mon Sep 17 00:00:00 2001 From: Yadunund Date: Wed, 31 Jan 2024 16:07:26 +0800 Subject: [PATCH] Call event callbacks once an event is queued Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 41 +++++++++++++-------- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 12 +++--- rmw_zenoh_cpp/src/rmw_event.cpp | 24 ------------ 3 files changed, 31 insertions(+), 46 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index bef61de1..a44625c6 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -30,7 +30,7 @@ void EventsBase::set_user_callback( const void * user_data, rmw_event_callback_t callback) { - std::lock_guard lock_mutex(event_mutex_); + std::lock_guard lock_mutex(event_mutex_); if (callback_) { // Push events arrived before setting the the executor callback. @@ -50,7 +50,7 @@ void EventsBase::set_user_callback( void EventsBase::trigger_user_callback() { // Trigger the user provided event callback if available. - std::lock_guard lock_event_mutex(event_mutex_); + std::lock_guard lock_mutex(event_mutex_); if (callback_ != nullptr) { callback_(user_data_, 1); } else { @@ -66,12 +66,13 @@ void EventsBase::event_set_callback( { if (event_id > ZENOH_EVENT_ID_MAX) { RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( - "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.", + "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. " + "Report this bug.", event_id); return; } - std::lock_guard lock(event_mutex_); + std::lock_guard lock(event_mutex_); // Set the user callback data event_callback_[event_id] = callback; @@ -90,12 +91,13 @@ void EventsBase::trigger_event_callback(rmw_zenoh_event_type_t event_id) { if (event_id > ZENOH_EVENT_ID_MAX) { RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( - "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.", + "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. " + "Report this bug.", event_id); return; } - std::lock_guard lock(event_mutex_); + std::lock_guard lock(event_mutex_); if (event_callback_[event_id] != nullptr) { event_callback_[event_id](event_data_[event_id], 1); @@ -110,12 +112,13 @@ bool EventsBase::event_queue_is_empty(rmw_zenoh_event_type_t event_id) const { if (event_id > ZENOH_EVENT_ID_MAX) { RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( - "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.", + "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. " + "Report this bug.", event_id); return true; } - std::lock_guard lock(event_mutex_); + std::lock_guard lock(event_mutex_); return event_queues_[event_id].empty(); } @@ -126,12 +129,13 @@ std::unique_ptr EventsBase::pop_next_event( { if (event_id > ZENOH_EVENT_ID_MAX) { RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( - "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.", + "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. " + "Report this bug.", event_id); return nullptr; } - std::lock_guard lock(event_mutex_); + std::lock_guard lock(event_mutex_); if (event_queues_[event_id].empty()) { // This tells rcl that the check for a new events was done, but no events have come in yet. @@ -152,12 +156,13 @@ void EventsBase::add_new_event( { if (event_id > ZENOH_EVENT_ID_MAX) { RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( - "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.", + "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. " + "Report this bug.", event_id); return; } - std::lock_guard lock(event_mutex_); + std::lock_guard lock(event_mutex_); std::deque> & event_queue = event_queues_[event_id]; if (event_queue.size() >= event_queue_depth_) { @@ -174,7 +179,8 @@ void EventsBase::add_new_event( event_queue.emplace_back(std::move(event)); - // Since we added new data, trigger the event guard condition if it is available. + // Since we added new data, trigger event callback and guard condition if they are available + trigger_event_callback(event_id); notify_event(event_id); } @@ -185,7 +191,8 @@ void EventsBase::attach_event_condition( { if (event_id > ZENOH_EVENT_ID_MAX) { RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( - "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.", + "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. " + "Report this bug.", event_id); return; } @@ -199,7 +206,8 @@ void EventsBase::detach_event_condition(rmw_zenoh_event_type_t event_id) { if (event_id > ZENOH_EVENT_ID_MAX) { RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( - "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.", + "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. " + "Report this bug.", event_id); return; } @@ -213,7 +221,8 @@ void EventsBase::notify_event(rmw_zenoh_event_type_t event_id) { if (event_id > ZENOH_EVENT_ID_MAX) { RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( - "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.", + "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. " + "Report this bug.", event_id); return; } diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index dff46d17..5f6a2eb9 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -73,10 +73,6 @@ class EventsBase rmw_event_callback_t callback, const void * user_data); - /// @brief Trigger the callback for an event. - /// @param event_id the event id whose callback should be triggered. - void trigger_event_callback(rmw_zenoh_event_type_t event_id); - /// @brief Returns true if the event queue is empty. /// @param event_id the event id whose event queue should be checked. bool event_queue_is_empty(rmw_zenoh_event_type_t event_id) const; @@ -102,11 +98,16 @@ class EventsBase void detach_event_condition(rmw_zenoh_event_type_t event_id); private: + /// @brief Trigger the callback for an event. + /// @param event_id the event id whose callback should be triggered. + void trigger_event_callback(rmw_zenoh_event_type_t event_id); + /// Notify once event is added to an event queue. void notify_event(rmw_zenoh_event_type_t event_id); /// Mutex to lock when read/writing members. - mutable std::mutex event_mutex_; + // The mutex is recursive as add_new_event() invokes `trigger_event_callback()`. + mutable std::recursive_mutex event_mutex_; /// Mutex to lock for event_condition. mutable std::mutex event_condition_mutex_; /// Condition variable to attach for event notifications. @@ -179,7 +180,6 @@ class rmw_publisher_data_t : public EventsBase // Context for memory allocation for messages. rmw_context_t * context; - }; ///============================================================================= diff --git a/rmw_zenoh_cpp/src/rmw_event.cpp b/rmw_zenoh_cpp/src/rmw_event.cpp index 167bb8e2..ec137cd2 100644 --- a/rmw_zenoh_cpp/src/rmw_event.cpp +++ b/rmw_zenoh_cpp/src/rmw_event.cpp @@ -116,30 +116,6 @@ rmw_event_set_callback( callback, user_data); - // switch (zenoh_event_it->second) { - // case ZENOH_EVENT_REQUESTED_QOS_INCOMPATIBLE: { - // rmw_subscription_data_t * sub_data = static_cast(rmw_event->data); - // RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - // sub_data->event_set_callback( - // zenoh_event_it->second, - // callback, - // user_data); - // break; - // } - // case ZENOH_EVENT_OFFERED_QOS_INCOMPATIBLE: { - // rmw_publisher_data_t * pub_data = static_cast(rmw_event->data); - // RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); - // pub_data->event_set_callback( - // zenoh_event_it->second, - // callback, - // user_data); - // break; - // } - // default: { - // return RMW_RET_INVALID_ARGUMENT; - // } - // } - return RMW_RET_OK; }