Skip to content

Commit

Permalink
Fix how total_count and total_count_change are calculated for matched…
Browse files Browse the repository at this point in the history
… events (ros2#287)

* Fix how total_count and change are calculated

Signed-off-by: Yadunund <[email protected]>

* Ensure key expressions match

Signed-off-by: Yadunund <[email protected]>

---------

Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund authored and YuanYuYuan committed Sep 30, 2024
1 parent 73d68af commit 16916cb
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 25 deletions.
4 changes: 2 additions & 2 deletions rmw_zenoh_cpp/src/detail/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ struct rmw_zenoh_event_status_t
size_t total_count;
size_t total_count_change;
size_t current_count;
size_t current_count_change;
int32_t current_count_change;
// The data field can be used to store serialized information for more complex statuses.
std::string data;

Expand Down Expand Up @@ -97,7 +97,7 @@ class DataCallbackManager
size_t unread_count_ {0};
};

/// Base class to be inherited by entities that support events.
/// A class to manage QoS related events.
class EventsManager
{
public:
Expand Down
55 changes: 34 additions & 21 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,17 +213,20 @@ void GraphCache::handle_matched_events_for_put(
for (const auto & [_, topic_data_ptr] : topic_qos_map) {
if (is_pub) {
// Count the number of matching subs for each set of qos settings.
if (!topic_data_ptr->subs_.empty()) {
match_count_for_entity += topic_data_ptr->subs_.size();
}
match_count_for_entity += topic_data_ptr->subs_.size();
// Also iterate through the subs to check if any are local and if update event counters.
for (liveliness::ConstEntityPtr sub_entity : topic_data_ptr->subs_) {
update_event_counters(
topic_info.name_,
ZENOH_EVENT_SUBSCRIPTION_MATCHED,
static_cast<int32_t>(1));
if (is_entity_local(*sub_entity)) {
local_entities_with_events[sub_entity].insert(ZENOH_EVENT_SUBSCRIPTION_MATCHED);
// Update counters only if key expressions match.
if (entity->topic_info()->topic_keyexpr_ ==
sub_entity->topic_info().value().topic_keyexpr_)
{
update_event_counters(
topic_info.name_,
ZENOH_EVENT_SUBSCRIPTION_MATCHED,
static_cast<int32_t>(1));
if (is_entity_local(*sub_entity)) {
local_entities_with_events[sub_entity].insert(ZENOH_EVENT_SUBSCRIPTION_MATCHED);
}
}
}
// Update event counters for the new entity->
Expand All @@ -237,17 +240,20 @@ void GraphCache::handle_matched_events_for_put(
} else {
// Entity is a sub.
// Count the number of matching pubs for each set of qos settings.
if (!topic_data_ptr->pubs_.empty()) {
match_count_for_entity += topic_data_ptr->pubs_.size();
}
match_count_for_entity += topic_data_ptr->pubs_.size();
// Also iterate through the pubs to check if any are local and if update event counters.
for (liveliness::ConstEntityPtr pub_entity : topic_data_ptr->pubs_) {
update_event_counters(
topic_info.name_,
ZENOH_EVENT_PUBLICATION_MATCHED,
static_cast<int32_t>(1));
if (is_entity_local(*pub_entity)) {
local_entities_with_events[pub_entity].insert(ZENOH_EVENT_PUBLICATION_MATCHED);
// Update counters only if key expressions match.
if (entity->topic_info()->topic_keyexpr_ ==
pub_entity->topic_info().value().topic_keyexpr_)
{
update_event_counters(
topic_info.name_,
ZENOH_EVENT_PUBLICATION_MATCHED,
static_cast<int32_t>(1));
if (is_entity_local(*pub_entity)) {
local_entities_with_events[pub_entity].insert(ZENOH_EVENT_PUBLICATION_MATCHED);
}
}
}
// Update event counters for the new entity->
Expand Down Expand Up @@ -307,7 +313,7 @@ void GraphCache::handle_matched_events_for_del(
}

///=============================================================================
void GraphCache::take_entities_with_events(EntityEventMap & entities_with_events)
void GraphCache::take_entities_with_events(const EntityEventMap & entities_with_events)
{
for (const auto & [local_entity, event_set] : entities_with_events) {
// Trigger callback set for this entity for the event type.
Expand Down Expand Up @@ -1262,6 +1268,13 @@ void GraphCache::set_qos_event_callback(
event_cb_it->second[event_type] = std::move(callback);
}

///=============================================================================
void GraphCache::remove_qos_event_callbacks(liveliness::ConstEntityPtr entity)
{
std::lock_guard<std::mutex> lock(graph_mutex_);
event_callbacks_.erase(entity);
}

///=============================================================================
bool GraphCache::is_entity_local(const liveliness::Entity & entity) const
{
Expand Down Expand Up @@ -1302,8 +1315,8 @@ void GraphCache::update_event_counters(
}

rmw_zenoh_event_status_t & status_to_update = event_statuses_[topic_name][event_id];
status_to_update.total_count += std::abs(change);
status_to_update.total_count_change += std::abs(change);
status_to_update.total_count += std::max(0, change);
status_to_update.total_count_change += std::max(0, change);
status_to_update.current_count += change;
status_to_update.current_count_change = change;
}
Expand Down
5 changes: 4 additions & 1 deletion rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ class GraphCache final
const rmw_zenoh_event_type_t & event_type,
GraphCacheEventCallback callback);

/// Remove all qos event callbacks for an entity.
void remove_qos_event_callbacks(liveliness::ConstEntityPtr entity);

/// Returns true if the entity is a publisher or client. False otherwise.
static bool is_entity_pub(const liveliness::Entity & entity);

Expand Down Expand Up @@ -246,7 +249,7 @@ class GraphCache final

using EntityEventMap =
std::unordered_map<liveliness::ConstEntityPtr, std::unordered_set<rmw_zenoh_event_type_t>>;
void take_entities_with_events(EntityEventMap & entities_with_events);
void take_entities_with_events(const EntityEventMap & entities_with_events);

std::string zid_str_;
/*
Expand Down
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/src/rmw_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ rmw_event_set_callback(
return RMW_RET_ERROR;
}

// Both rmw_subscription_data_t and rmw_publisher_data_t inherit EventsBase.
// Both rmw_subscription_data_t and rmw_publisher_data_t store an EventsManager object.
rmw_zenoh_cpp::EventsManager * event_data =
static_cast<rmw_zenoh_cpp::EventsManager *>(rmw_event->data);
RMW_CHECK_ARGUMENT_FOR_NULL(event_data, RMW_RET_INVALID_ARGUMENT);
Expand Down
11 changes: 11 additions & 0 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,10 @@ rmw_ret_t
rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher)
{
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT);
rmw_context_impl_s * context_impl = static_cast<rmw_context_impl_s *>(node->context->impl);
RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(publisher->data, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
Expand Down Expand Up @@ -718,6 +722,10 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher)
RMW_SET_ERROR_MSG("failed to undeclare pub");
ret = RMW_RET_ERROR;
}

// Remove any event callbacks registered to this publisher.
context_impl->graph_cache->remove_qos_event_callbacks(publisher_data->entity);

RMW_TRY_DESTRUCTOR(publisher_data->~rmw_publisher_data_t(), rmw_publisher_data_t, );
allocator->deallocate(publisher_data, allocator->state);
}
Expand Down Expand Up @@ -1584,6 +1592,9 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)
context_impl->graph_cache->remove_querying_subscriber_callback(sub_data);
}

// Remove any event callbacks registered to this subscription.
context_impl->graph_cache->remove_qos_event_callbacks(sub_data->entity);

RMW_TRY_DESTRUCTOR(sub_data->~rmw_subscription_data_t(), rmw_subscription_data_t, );
allocator->deallocate(sub_data, allocator->state);
}
Expand Down

0 comments on commit 16916cb

Please sign in to comment.