diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index ceccbe82..36f931f1 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -411,8 +411,8 @@ void GraphCache::parse_put( { auto sub_cbs_it = querying_subs_cbs_.find(entity->topic_info()->topic_keyexpr_); if (sub_cbs_it != querying_subs_cbs_.end()) { - for (const auto & cb : sub_cbs_it->second) { - cb(entity->zid()); + for (auto sub_it = sub_cbs_it->second.begin(); sub_it != sub_cbs_it->second.end(); ++sub_it) { + sub_it->second(entity->zid()); } } } @@ -1331,15 +1331,29 @@ std::unique_ptr GraphCache::take_event_status( ///============================================================================= void GraphCache::set_querying_subscriber_callback( - const std::string & keyexpr, + const rmw_subscription_data_t * sub_data, QueryingSubscriberCallback cb) { + const std::string keyexpr = sub_data->entity->topic_info()->topic_keyexpr_; auto cb_it = querying_subs_cbs_.find(keyexpr); if (cb_it == querying_subs_cbs_.end()) { - querying_subs_cbs_[keyexpr] = std::move(std::vector{}); + querying_subs_cbs_[keyexpr] = std::move( + std::unordered_map{}); cb_it = querying_subs_cbs_.find(keyexpr); } - cb_it->second.push_back(std::move(cb)); + cb_it->second.insert(std::make_pair(sub_data, std::move(cb))); +} + +///============================================================================= +void GraphCache::remove_querying_subscriber_callback( + const rmw_subscription_data_t * sub_data) +{ + auto cb_map_it = querying_subs_cbs_.find(sub_data->entity->topic_info()->topic_keyexpr_); + if (cb_map_it == querying_subs_cbs_.end()) { + return; + } + cb_map_it->second.erase(sub_data); } } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index e38b3262..bd380402 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -36,6 +36,11 @@ namespace rmw_zenoh_cpp { +// Forward declare to prevent circular dependency. +// TODO(Yadunund): Remove this once we move rmw_subscription_data_t out of +// rmw_data_types.hpp. +class rmw_subscription_data_t; + ///============================================================================= // TODO(Yadunund): Consider changing this to an array of unordered_set where the index of the // array corresponds to the EntityType enum. This way we don't need to mix @@ -184,9 +189,12 @@ class GraphCache final static bool is_entity_pub(const liveliness::Entity & entity); void set_querying_subscriber_callback( - const std::string & keyexpr, + const rmw_subscription_data_t * sub_data, QueryingSubscriberCallback cb); + void remove_querying_subscriber_callback( + const rmw_subscription_data_t * sub_data); + private: // Helper function to convert an Entity into a GraphNode. // Note: this will update bookkeeping variables in GraphCache. @@ -286,7 +294,8 @@ class GraphCache final // EventCallbackMap for each type of event we support in rmw_zenoh_cpp. GraphEventCallbackMap event_callbacks_; // Map keyexpressions to QueryingSubscriberCallback. - std::unordered_map> querying_subs_cbs_; + std::unordered_map> querying_subs_cbs_; // Counters to track changes to event statues for each topic. std::unordered_map> event_statuses_; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index f5ebb79d..f608c2ac 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1441,7 +1441,7 @@ rmw_create_subscription( // Register the querying subscriber with the graph cache to get latest // messages from publishers that were discovered after their first publication. context_impl->graph_cache->set_querying_subscriber_callback( - sub_data->entity->topic_info()->topic_keyexpr_, + sub_data, [sub_data](const std::string & queryable_prefix) -> void { if (sub_data == nullptr) { @@ -1537,6 +1537,10 @@ rmw_ret_t rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) { RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); + rmw_context_impl_s * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, @@ -1574,6 +1578,8 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) RMW_SET_ERROR_MSG("failed to undeclare sub"); ret = RMW_RET_ERROR; } + // Also remove the registered callback from the GraphCache. + context_impl->graph_cache->remove_querying_subscriber_callback(sub_data); } RMW_TRY_DESTRUCTOR(sub_data->~rmw_subscription_data_t(), rmw_subscription_data_t, );