Skip to content

Commit

Permalink
Call event callbacks once an event is queued
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Jan 31, 2024
1 parent 7852715 commit 1d9651a
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 46 deletions.
41 changes: 25 additions & 16 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
void EventsBase::set_user_callback(
const void * user_data, rmw_event_callback_t callback)
{
std::lock_guard<std::mutex> lock_mutex(event_mutex_);
std::lock_guard<std::recursive_mutex> lock_mutex(event_mutex_);

if (callback_) {
// Push events arrived before setting the the executor callback.
Expand All @@ -50,7 +50,7 @@ void EventsBase::set_user_callback(
void EventsBase::trigger_user_callback()
{
// Trigger the user provided event callback if available.
std::lock_guard<std::mutex> lock_event_mutex(event_mutex_);
std::lock_guard<std::recursive_mutex> lock_mutex(event_mutex_);
if (callback_ != nullptr) {
callback_(user_data_, 1);
} else {
Expand All @@ -66,12 +66,13 @@ void EventsBase::event_set_callback(
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.",
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return;
}

std::lock_guard<std::mutex> lock(event_mutex_);
std::lock_guard<std::recursive_mutex> lock(event_mutex_);

// Set the user callback data
event_callback_[event_id] = callback;
Expand All @@ -90,12 +91,13 @@ void EventsBase::trigger_event_callback(rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.",
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return;
}

std::lock_guard<std::mutex> lock(event_mutex_);
std::lock_guard<std::recursive_mutex> lock(event_mutex_);

if (event_callback_[event_id] != nullptr) {
event_callback_[event_id](event_data_[event_id], 1);
Expand All @@ -110,12 +112,13 @@ bool EventsBase::event_queue_is_empty(rmw_zenoh_event_type_t event_id) const
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.",
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return true;
}

std::lock_guard<std::mutex> lock(event_mutex_);
std::lock_guard<std::recursive_mutex> lock(event_mutex_);

return event_queues_[event_id].empty();
}
Expand All @@ -126,12 +129,13 @@ std::unique_ptr<rmw_zenoh_event_status_t> EventsBase::pop_next_event(
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.",
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return nullptr;
}

std::lock_guard<std::mutex> lock(event_mutex_);
std::lock_guard<std::recursive_mutex> lock(event_mutex_);

if (event_queues_[event_id].empty()) {
// This tells rcl that the check for a new events was done, but no events have come in yet.
Expand All @@ -152,12 +156,13 @@ void EventsBase::add_new_event(
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.",
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return;
}

std::lock_guard<std::mutex> lock(event_mutex_);
std::lock_guard<std::recursive_mutex> lock(event_mutex_);

std::deque<std::unique_ptr<rmw_zenoh_event_status_t>> & event_queue = event_queues_[event_id];
if (event_queue.size() >= event_queue_depth_) {
Expand All @@ -174,7 +179,8 @@ void EventsBase::add_new_event(

event_queue.emplace_back(std::move(event));

// Since we added new data, trigger the event guard condition if it is available.
// Since we added new data, trigger event callback and guard condition if they are available
trigger_event_callback(event_id);
notify_event(event_id);
}

Expand All @@ -185,7 +191,8 @@ void EventsBase::attach_event_condition(
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.",
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return;
}
Expand All @@ -199,7 +206,8 @@ void EventsBase::detach_event_condition(rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.",
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return;
}
Expand All @@ -213,7 +221,8 @@ void EventsBase::notify_event(rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.",
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return;
}
Expand Down
12 changes: 6 additions & 6 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,6 @@ class EventsBase
rmw_event_callback_t callback,
const void * user_data);

/// @brief Trigger the callback for an event.
/// @param event_id the event id whose callback should be triggered.
void trigger_event_callback(rmw_zenoh_event_type_t event_id);

/// @brief Returns true if the event queue is empty.
/// @param event_id the event id whose event queue should be checked.
bool event_queue_is_empty(rmw_zenoh_event_type_t event_id) const;
Expand All @@ -102,11 +98,16 @@ class EventsBase
void detach_event_condition(rmw_zenoh_event_type_t event_id);

private:
/// @brief Trigger the callback for an event.
/// @param event_id the event id whose callback should be triggered.
void trigger_event_callback(rmw_zenoh_event_type_t event_id);

/// Notify once event is added to an event queue.
void notify_event(rmw_zenoh_event_type_t event_id);

/// Mutex to lock when read/writing members.
mutable std::mutex event_mutex_;
// The mutex is recursive as add_new_event() invokes `trigger_event_callback()`.
mutable std::recursive_mutex event_mutex_;
/// Mutex to lock for event_condition.
mutable std::mutex event_condition_mutex_;
/// Condition variable to attach for event notifications.
Expand Down Expand Up @@ -179,7 +180,6 @@ class rmw_publisher_data_t : public EventsBase

// Context for memory allocation for messages.
rmw_context_t * context;

};

///=============================================================================
Expand Down
24 changes: 0 additions & 24 deletions rmw_zenoh_cpp/src/rmw_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,30 +116,6 @@ rmw_event_set_callback(
callback,
user_data);

// switch (zenoh_event_it->second) {
// case ZENOH_EVENT_REQUESTED_QOS_INCOMPATIBLE: {
// rmw_subscription_data_t * sub_data = static_cast<rmw_subscription_data_t *>(rmw_event->data);
// RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT);
// sub_data->event_set_callback(
// zenoh_event_it->second,
// callback,
// user_data);
// break;
// }
// case ZENOH_EVENT_OFFERED_QOS_INCOMPATIBLE: {
// rmw_publisher_data_t * pub_data = static_cast<rmw_publisher_data_t *>(rmw_event->data);
// RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT);
// pub_data->event_set_callback(
// zenoh_event_it->second,
// callback,
// user_data);
// break;
// }
// default: {
// return RMW_RET_INVALID_ARGUMENT;
// }
// }

return RMW_RET_OK;
}

Expand Down

0 comments on commit 1d9651a

Please sign in to comment.