From 589e03119d2637fc0d9e9b7f19ddfed3b92f4300 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Tue, 17 Sep 2024 12:35:18 +0800 Subject: [PATCH 1/2] Remove querying sub cb when subscription is destroyed Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 23 ++++++++++++++++++----- rmw_zenoh_cpp/src/detail/graph_cache.hpp | 13 +++++++++++-- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 8 +++++++- 3 files changed, 36 insertions(+), 8 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index e27acef6..a1844643 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -412,8 +412,8 @@ void GraphCache::parse_put( { auto sub_cbs_it = querying_subs_cbs_.find(entity->topic_info()->topic_keyexpr_); if (sub_cbs_it != querying_subs_cbs_.end()) { - for (const auto & cb : sub_cbs_it->second) { - cb(entity->zid()); + for (auto sub_it = sub_cbs_it->second.begin(); sub_it != sub_cbs_it->second.end(); ++sub_it) { + sub_it->second(entity->zid()); } } } @@ -1332,15 +1332,28 @@ std::unique_ptr GraphCache::take_event_status( ///============================================================================= void GraphCache::set_querying_subscriber_callback( - const std::string & keyexpr, + const rmw_subscription_data_t * sub_data, QueryingSubscriberCallback cb) { + const std::string keyexpr = sub_data->entity->topic_info()->topic_keyexpr_; auto cb_it = querying_subs_cbs_.find(keyexpr); if (cb_it == querying_subs_cbs_.end()) { - querying_subs_cbs_[keyexpr] = std::move(std::vector{}); + querying_subs_cbs_[keyexpr] = std::move(std::unordered_map{}); cb_it = querying_subs_cbs_.find(keyexpr); } - cb_it->second.push_back(std::move(cb)); + cb_it->second.insert(std::make_pair(sub_data, std::move(cb))); +} + +///============================================================================= +void GraphCache::remove_querying_subscriber_callback( + const rmw_subscription_data_t * sub_data) +{ + auto cb_map_it = querying_subs_cbs_.find(sub_data->entity->topic_info()->topic_keyexpr_); + if (cb_map_it == querying_subs_cbs_.end()) { + return; + } + cb_map_it->second.erase(sub_data); } } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index 664f2461..f98f38bf 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -38,6 +38,11 @@ namespace rmw_zenoh_cpp { +// Forward declare to prevent circular dependency. +// TODO(Yadunund): Remove this once we move rmw_subscription_data_t out of +// rmw_data_types.hpp. +class rmw_subscription_data_t; + ///============================================================================= // TODO(Yadunund): Consider changing this to an array of unordered_set where the index of the // array corresponds to the EntityType enum. This way we don't need to mix @@ -186,9 +191,12 @@ class GraphCache final static bool is_entity_pub(const liveliness::Entity & entity); void set_querying_subscriber_callback( - const std::string & keyexpr, + const rmw_subscription_data_t * sub_data, QueryingSubscriberCallback cb); + void remove_querying_subscriber_callback( + const rmw_subscription_data_t * sub_data); + private: // Helper function to convert an Entity into a GraphNode. // Note: this will update bookkeeping variables in GraphCache. @@ -288,7 +296,8 @@ class GraphCache final // EventCallbackMap for each type of event we support in rmw_zenoh_cpp. GraphEventCallbackMap event_callbacks_; // Map keyexpressions to QueryingSubscriberCallback. - std::unordered_map> querying_subs_cbs_; + std::unordered_map> querying_subs_cbs_; // Counters to track changes to event statues for each topic. std::unordered_map> event_statuses_; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 05232a7c..577cb34e 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1493,7 +1493,7 @@ rmw_create_subscription( // Register the querying subscriber with the graph cache to get latest // messages from publishers that were discovered after their first publication. context_impl->graph_cache->set_querying_subscriber_callback( - sub_data->entity->topic_info()->topic_keyexpr_, + sub_data, [sub_data](const std::string & queryable_prefix) -> void { if (sub_data == nullptr) { @@ -1589,6 +1589,10 @@ rmw_ret_t rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) { RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); + rmw_context_impl_s * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, @@ -1626,6 +1630,8 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) RMW_SET_ERROR_MSG("failed to undeclare sub"); ret = RMW_RET_ERROR; } + // Also remove the registered callback from the GraphCache. + context_impl->graph_cache->remove_querying_subscriber_callback(sub_data); } RMW_TRY_DESTRUCTOR(sub_data->~rmw_subscription_data_t(), rmw_subscription_data_t, ); From 11517a26d117131ea12e70c7dd95dbffee6f8f3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Hern=C3=A1ndez=20Cordero?= Date: Tue, 17 Sep 2024 13:17:30 +0200 Subject: [PATCH 2/2] Fix uncrustify MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Alejandro Hernández Cordero --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index a1844643..dc651479 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -1338,8 +1338,9 @@ void GraphCache::set_querying_subscriber_callback( const std::string keyexpr = sub_data->entity->topic_info()->topic_keyexpr_; auto cb_it = querying_subs_cbs_.find(keyexpr); if (cb_it == querying_subs_cbs_.end()) { - querying_subs_cbs_[keyexpr] = std::move(std::unordered_map{}); + querying_subs_cbs_[keyexpr] = std::move( + std::unordered_map{}); cb_it = querying_subs_cbs_.find(keyexpr); } cb_it->second.insert(std::make_pair(sub_data, std::move(cb)));