diff --git a/rmw_zenoh_cpp/src/detail/event.hpp b/rmw_zenoh_cpp/src/detail/event.hpp index 8691c3ce..ca18ca30 100644 --- a/rmw_zenoh_cpp/src/detail/event.hpp +++ b/rmw_zenoh_cpp/src/detail/event.hpp @@ -158,12 +158,7 @@ class EventsManager mutable std::mutex event_condition_mutex_; /// Condition variable to attach for event notifications. std::condition_variable * event_conditions_[ZENOH_EVENT_ID_MAX + 1]{nullptr}; - /// User callback that can be set via data_callback_mgr.set_callback(). - rmw_event_callback_t callback_ {nullptr}; - /// User data that should be passed to the user callback. - const void * user_data_ {nullptr}; - /// Count for - size_t unread_count_ {0}; + rmw_event_callback_t event_callback_[ZENOH_EVENT_ID_MAX + 1] {nullptr}; const void * event_data_[ZENOH_EVENT_ID_MAX + 1] {nullptr}; size_t event_unread_count_[ZENOH_EVENT_ID_MAX + 1] {0}; diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index dce43cd5..78d5b412 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -88,7 +88,6 @@ static const char PUB_STR[] = "MP"; static const char SUB_STR[] = "MS"; static const char SRV_STR[] = "SS"; static const char CLI_STR[] = "SC"; -static const char EMPTY_NAMESPACE_REPLACEMENT = '_'; static const char KEYEXPR_DELIMITER = '/'; static const char SLASH_REPLACEMENT = '%'; static const char QOS_DELIMITER = ':'; @@ -261,7 +260,7 @@ Entity::Entity( keyexpr_parts[KeyexprIndex::Id] = id_; keyexpr_parts[KeyexprIndex::EntityStr] = entity_to_str.at(type_); // An empty namespace from rcl will contain "/" but zenoh does not allow keys with "//". - // Hence we add an "_" to denote an empty namespace such that splitting the key + // Hence we mangle the empty namespace such that splitting the key // will always result in 5 parts. keyexpr_parts[KeyexprIndex::Namespace] = mangle_name(node_info_.ns_); keyexpr_parts[KeyexprIndex::NodeName] = mangle_name(node_info_.name_); diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 895124b3..446fe13f 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -31,6 +31,24 @@ #include "attachment_helpers.hpp" #include "rmw_data_types.hpp" +///============================================================================= +static size_t hash_gid(const uint8_t * gid) +{ + std::stringstream hash_str; + hash_str << std::hex; + size_t i = 0; + for (; i < (RMW_GID_STORAGE_SIZE - 1); i++) { + hash_str << static_cast(gid[i]); + } + return std::hash{}(hash_str.str()); +} + +///============================================================================= +static size_t hash_gid(const rmw_request_id_t & request_id) +{ + return hash_gid(request_id.writer_guid); +} + ///============================================================================= size_t rmw_context_impl_s::get_next_entity_id() { @@ -135,7 +153,24 @@ void rmw_subscription_data_t::add_new_message( } } - // TODO(Yadunund): Check for ZENOH_EVENT_MESSAGE_LOST. + // Check for messages lost if the new sequence number is not monotonically increasing. + const size_t gid_hash = hash_gid(msg->publisher_gid); + auto last_known_pub_it = last_known_published_msg_.find(gid_hash); + if (last_known_pub_it != last_known_published_msg_.end()) { + const int64_t seq_increment = std::abs(msg->sequence_number - last_known_pub_it->second); + if (seq_increment > 1) { + const size_t num_msg_lost = seq_increment - 1; + total_messages_lost_ += num_msg_lost; + auto event_status = std::make_unique(); + event_status->total_count_change = num_msg_lost; + event_status->total_count = total_messages_lost_; + events_mgr.add_new_event( + rmw_zenoh_cpp::ZENOH_EVENT_MESSAGE_LOST, + std::move(event_status)); + } + } + // Always update the last known sequence number for the publisher + last_known_published_msg_[gid_hash] = msg->sequence_number; message_queue_.emplace_back(std::move(msg)); @@ -211,17 +246,6 @@ void rmw_service_data_t::add_new_query(std::unique_ptr query) notify(); } -static size_t hash_gid(const rmw_request_id_t & request_id) -{ - std::stringstream hash_str; - hash_str << std::hex; - size_t i = 0; - for (; i < (RMW_GID_STORAGE_SIZE - 1); i++) { - hash_str << static_cast(request_id.writer_guid[i]); - } - return std::hash{}(hash_str.str()); -} - ///============================================================================= bool rmw_service_data_t::add_to_query_map( const rmw_request_id_t & request_id, std::unique_ptr query) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 285c00a9..a43af5d3 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -191,6 +191,10 @@ class rmw_subscription_data_t final std::deque> message_queue_; mutable std::mutex message_queue_mutex_; + // Map GID of a publisher to the sequence number of the message it published. + std::unordered_map last_known_published_msg_; + size_t total_messages_lost_{0}; + void notify(); std::condition_variable * condition_{nullptr};