Skip to content

Commit

Permalink
Combine the check and attach loops.
Browse files Browse the repository at this point in the history
This is mostly a performance optimization.  We notice that
we *always* attempt to detach the condition, regardless of
whether we've attached it to begin with.  We can use that
to our advantage, in that during the initial iteration
to check if something has triggered, we can also attach
at the same time.  This saves us from having to iterate
over the wait set again later on.  There is a downside
here in that we may end up attaching things that we
didn't need, but we also only do that up until something
is ready.  So this is probably a performance win in the
usual cases.

Signed-off-by: Chris Lalancette <[email protected]>
  • Loading branch information
clalancette committed Jun 21, 2024
1 parent 7609e41 commit f182cd8
Showing 1 changed file with 20 additions and 73 deletions.
93 changes: 20 additions & 73 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3136,12 +3136,13 @@ rmw_destroy_wait_set(rmw_wait_set_t * wait_set)
return RMW_RET_OK;
}

static bool has_triggered_condition(
static bool check_and_attach_condition(
const rmw_subscriptions_t * const subscriptions,
const rmw_guard_conditions_t * const guard_conditions,
const rmw_services_t * const services,
const rmw_clients_t * const clients,
const rmw_events_t * const events)
const rmw_events_t * const events,
rmw_zenoh_cpp::rmw_wait_set_data_t * wait_set_data)
{
if (guard_conditions) {
for (size_t i = 0; i < guard_conditions->guard_condition_count; ++i) {
Expand All @@ -3151,26 +3152,31 @@ static bool has_triggered_condition(
if (gc->has_triggered()) {
return true;
}

gc->attach_condition(&wait_set_data->condition_variable);
}
}
}

if (events) {
for (size_t i = 0; i < events->event_count; ++i) {
auto event = static_cast<rmw_event_t *>(events->events[i]);
const rmw_event_type_t & event_type = event->event_type;
// Check if the event queue for this event type is empty.
auto zenoh_event_it = rmw_zenoh_cpp::event_map.find(event_type);
auto zenoh_event_it = rmw_zenoh_cpp::event_map.find(event->event_type);
if (zenoh_event_it != rmw_zenoh_cpp::event_map.end()) {
auto event_data = static_cast<rmw_zenoh_cpp::EventsManager *>(event->data);
if (event_data != nullptr) {
if (!event_data->event_queue_is_empty(zenoh_event_it->second)) {
return true;
}

event_data->attach_event_condition(
zenoh_event_it->second,
&wait_set_data->condition_variable);
}
} else {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"has_triggered_condition() called with unknown event %u. Report this bug.", event_type);
"has_triggered_condition() called with unknown event %u. Report this bug.",
event->event_type);
}
}
}
Expand All @@ -3183,6 +3189,8 @@ static bool has_triggered_condition(
if (!sub_data->message_queue_is_empty()) {
return true;
}

sub_data->attach_condition(&wait_set_data->condition_variable);
}
}
}
Expand All @@ -3194,6 +3202,8 @@ static bool has_triggered_condition(
if (!serv_data->query_queue_is_empty()) {
return true;
}

serv_data->attach_condition(&wait_set_data->condition_variable);
}
}
}
Expand All @@ -3206,6 +3216,8 @@ static bool has_triggered_condition(
if (!client_data->reply_queue_is_empty()) {
return true;
}

client_data->attach_condition(&wait_set_data->condition_variable);
}
}
}
Expand Down Expand Up @@ -3253,76 +3265,11 @@ rmw_wait(
// signals to the upper layers that it isn't ready. If something is ready, then we leave it as
// a valid pointer.

bool skip_wait = has_triggered_condition(
subscriptions, guard_conditions, services, clients, events);
bool skip_wait = check_and_attach_condition(
subscriptions, guard_conditions, services, clients, events, wait_set_data);
bool wait_result = true;

if (!skip_wait) {
if (guard_conditions) {
// Attach the wait set condition variable to each guard condition.
// That way they can wake it up if they are triggered while we are waiting.
for (size_t i = 0; i < guard_conditions->guard_condition_count; ++i) {
// This is hard to track down, but each of the (void *) pointers in
// guard_conditions->guard_conditions points to the data field of the related
// rmw_guard_condition_t. So we can directly cast it to GuardCondition.
rmw_zenoh_cpp::GuardCondition * gc =
static_cast<rmw_zenoh_cpp::GuardCondition *>(guard_conditions->guard_conditions[i]);
if (gc != nullptr) {
gc->attach_condition(&wait_set_data->condition_variable);
}
}
}

if (subscriptions) {
// Attach the wait set condition variable to each subscription.
// That way they can wake it up if they are triggered while we are waiting.
for (size_t i = 0; i < subscriptions->subscriber_count; ++i) {
auto sub_data =
static_cast<rmw_zenoh_cpp::rmw_subscription_data_t *>(subscriptions->subscribers[i]);
if (sub_data != nullptr) {
sub_data->attach_condition(&wait_set_data->condition_variable);
}
}
}

if (services) {
// Attach the wait set condition variable to each service.
// That way they can wake it up if they are triggered while we are waiting.
for (size_t i = 0; i < services->service_count; ++i) {
auto serv_data = static_cast<rmw_zenoh_cpp::rmw_service_data_t *>(services->services[i]);
if (serv_data != nullptr) {
serv_data->attach_condition(&wait_set_data->condition_variable);
}
}
}

if (clients) {
// Attach the wait set condition variable to each client.
// That way they can wake it up if they are triggered while we are waiting.
for (size_t i = 0; i < clients->client_count; ++i) {
rmw_zenoh_cpp::rmw_client_data_t * client_data =
static_cast<rmw_zenoh_cpp::rmw_client_data_t *>(clients->clients[i]);
if (client_data != nullptr) {
client_data->attach_condition(&wait_set_data->condition_variable);
}
}
}

if (events) {
for (size_t i = 0; i < events->event_count; ++i) {
auto event = static_cast<rmw_event_t *>(events->events[i]);
auto event_data = static_cast<rmw_zenoh_cpp::EventsManager *>(event->data);
if (event_data != nullptr) {
auto zenoh_event_it = rmw_zenoh_cpp::event_map.find(event->event_type);
if (zenoh_event_it != rmw_zenoh_cpp::event_map.end()) {
event_data->attach_event_condition(
zenoh_event_it->second,
&wait_set_data->condition_variable);
}
}
}
}

std::unique_lock<std::mutex> lock(wait_set_data->condition_mutex);

// According to the RMW documentation, if wait_timeout is NULL that means
Expand Down

0 comments on commit f182cd8

Please sign in to comment.