diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index 809f0624..1785a4cd 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -27,6 +27,7 @@ find_package(zenohc REQUIRED) add_library(rmw_zenoh_cpp SHARED src/detail/attachment_helpers.cpp + src/detail/cdr.cpp src/detail/event.cpp src/detail/identifier.cpp src/detail/graph_cache.cpp diff --git a/rmw_zenoh_cpp/src/detail/cdr.cpp b/rmw_zenoh_cpp/src/detail/cdr.cpp new file mode 100644 index 00000000..0d8ec267 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/cdr.cpp @@ -0,0 +1,42 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "fastcdr/Cdr.h" +#include "fastcdr/FastBuffer.h" +#include "fastcdr/config.h" + +#include "cdr.hpp" + +rmw_zenoh_cpp::Cdr::Cdr(eprosima::fastcdr::FastBuffer & fastbuffer) +#if FASTCDR_VERSION_MAJOR == 1 +: cdr_(fastbuffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR) +#else +: cdr_(fastbuffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::CdrVersion::DDS_CDR) +#endif +{ +} + +size_t rmw_zenoh_cpp::Cdr::get_serialized_data_length() const +{ +#if FASTCDR_VERSION_MAJOR == 1 + return cdr_.getSerializedDataLength(); +#else + return cdr_.get_serialized_data_length(); +#endif +} + +eprosima::fastcdr::Cdr & rmw_zenoh_cpp::Cdr::get_cdr() +{ + return cdr_; +} diff --git a/rmw_zenoh_cpp/src/detail/cdr.hpp b/rmw_zenoh_cpp/src/detail/cdr.hpp new file mode 100644 index 00000000..22ac942d --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/cdr.hpp @@ -0,0 +1,39 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__CDR_HPP_ +#define DETAIL__CDR_HPP_ + +#include "fastcdr/Cdr.h" +#include "fastcdr/FastBuffer.h" + +// A wrapper class to paper over the differences between Fast-CDR v1 and Fast-CDR v2 +namespace rmw_zenoh_cpp +{ +class Cdr final +{ +public: + explicit Cdr(eprosima::fastcdr::FastBuffer & fastbuffer); + + eprosima::fastcdr::Cdr & get_cdr(); + + size_t get_serialized_data_length() const; + +private: + eprosima::fastcdr::Cdr cdr_; +}; + +} // namespace rmw_zenoh_cpp + +#endif // DETAIL__CDR_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/event.cpp b/rmw_zenoh_cpp/src/detail/event.cpp index 6cb83f62..94dbbea2 100644 --- a/rmw_zenoh_cpp/src/detail/event.cpp +++ b/rmw_zenoh_cpp/src/detail/event.cpp @@ -184,6 +184,7 @@ void EventsManager::add_new_event( ///============================================================================= void EventsManager::attach_event_condition( rmw_zenoh_event_type_t event_id, + std::mutex * condition_mutex, std::condition_variable * condition_variable) { if (event_id > ZENOH_EVENT_ID_MAX) { @@ -194,7 +195,8 @@ void EventsManager::attach_event_condition( return; } - std::lock_guard lock(event_condition_mutex_); + std::lock_guard lock(update_event_condition_mutex_); + event_condition_mutexes_[event_id] = condition_mutex; event_conditions_[event_id] = condition_variable; } @@ -209,7 +211,8 @@ void EventsManager::detach_event_condition(rmw_zenoh_event_type_t event_id) return; } - std::lock_guard lock(event_condition_mutex_); + std::lock_guard lock(update_event_condition_mutex_); + event_condition_mutexes_[event_id] = nullptr; event_conditions_[event_id] = nullptr; } @@ -224,8 +227,9 @@ void EventsManager::notify_event(rmw_zenoh_event_type_t event_id) return; } - std::lock_guard lock(event_condition_mutex_); + std::lock_guard lock(update_event_condition_mutex_); if (event_conditions_[event_id] != nullptr) { + std::lock_guard cvlk(*event_condition_mutexes_[event_id]); event_conditions_[event_id]->notify_one(); } } diff --git a/rmw_zenoh_cpp/src/detail/event.hpp b/rmw_zenoh_cpp/src/detail/event.hpp index a8246e97..8509da05 100644 --- a/rmw_zenoh_cpp/src/detail/event.hpp +++ b/rmw_zenoh_cpp/src/detail/event.hpp @@ -138,6 +138,7 @@ class EventsManager /// @param condition_variable to attach. void attach_event_condition( rmw_zenoh_event_type_t event_id, + std::mutex * condition_mutex, std::condition_variable * condition_variable); /// @brief Detach the condition variable provided by rmw_wait. @@ -154,7 +155,8 @@ class EventsManager /// Mutex to lock when read/writing members. mutable std::mutex event_mutex_; /// Mutex to lock for event_condition. - mutable std::mutex event_condition_mutex_; + mutable std::mutex update_event_condition_mutex_; + std::mutex * event_condition_mutexes_[ZENOH_EVENT_ID_MAX + 1]{nullptr}; /// Condition variable to attach for event notifications. std::condition_variable * event_conditions_[ZENOH_EVENT_ID_MAX + 1]{nullptr}; /// User callback that can be set via data_callback_mgr.set_callback(). diff --git a/rmw_zenoh_cpp/src/detail/guard_condition.cpp b/rmw_zenoh_cpp/src/detail/guard_condition.cpp index b850095f..214dff7e 100644 --- a/rmw_zenoh_cpp/src/detail/guard_condition.cpp +++ b/rmw_zenoh_cpp/src/detail/guard_condition.cpp @@ -33,14 +33,18 @@ void GuardCondition::trigger() has_triggered_ = true; if (condition_variable_ != nullptr) { + std::lock_guard cvlk(*condition_mutex_); condition_variable_->notify_one(); } } ///============================================================================== -void GuardCondition::attach_condition(std::condition_variable * condition_variable) +void GuardCondition::attach_condition( + std::mutex * condition_mutex, + std::condition_variable * condition_variable) { std::lock_guard lock(internal_mutex_); + condition_mutex_ = condition_mutex; condition_variable_ = condition_variable; } @@ -48,6 +52,7 @@ void GuardCondition::attach_condition(std::condition_variable * condition_variab void GuardCondition::detach_condition() { std::lock_guard lock(internal_mutex_); + condition_mutex_ = nullptr; condition_variable_ = nullptr; } diff --git a/rmw_zenoh_cpp/src/detail/guard_condition.hpp b/rmw_zenoh_cpp/src/detail/guard_condition.hpp index b556c5f7..ce13bf7e 100644 --- a/rmw_zenoh_cpp/src/detail/guard_condition.hpp +++ b/rmw_zenoh_cpp/src/detail/guard_condition.hpp @@ -29,7 +29,7 @@ class GuardCondition final // Sets has_triggered_ to true and calls notify_one() on condition_variable_ if set. void trigger(); - void attach_condition(std::condition_variable * condition_variable); + void attach_condition(std::mutex * condition_mutex, std::condition_variable * condition_variable); void detach_condition(); @@ -38,7 +38,8 @@ class GuardCondition final private: mutable std::mutex internal_mutex_; std::atomic_bool has_triggered_; - std::condition_variable * condition_variable_; + std::mutex * condition_mutex_{nullptr}; + std::condition_variable * condition_variable_{nullptr}; }; #endif // DETAIL__GUARD_CONDITION_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index 1bb4464c..a4b1e5ce 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -319,8 +319,8 @@ std::shared_ptr Entity::make( return std::make_shared( Entity{ zid_to_str(zid), - std::move(nid), - std::move(id), + nid, + id, std::move(type), std::move(node_info), std::move(topic_info)}); diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 8f88b75c..11a8dc4e 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -62,17 +63,23 @@ size_t rmw_publisher_data_t::get_next_sequence_number() } ///============================================================================= -void rmw_subscription_data_t::attach_condition(std::condition_variable * condition_variable) +void rmw_subscription_data_t::attach_condition( + std::mutex * condition_mutex, + std::condition_variable * condition_variable) { - std::lock_guard lock(condition_mutex_); + std::lock_guard lock(update_condition_mutex_); + condition_mutex_ = condition_mutex; condition_ = condition_variable; } ///============================================================================= void rmw_subscription_data_t::notify() { - std::lock_guard lock(condition_mutex_); + std::lock_guard lock(update_condition_mutex_); if (condition_ != nullptr) { + // We also need to take the mutex for the condition_variable; see the comment + // in rmw_wait for more information + std::lock_guard cvlk(*condition_mutex_); condition_->notify_one(); } } @@ -80,7 +87,8 @@ void rmw_subscription_data_t::notify() ///============================================================================= void rmw_subscription_data_t::detach_condition() { - std::lock_guard lock(condition_mutex_); + std::lock_guard lock(update_condition_mutex_); + condition_mutex_ = nullptr; condition_ = nullptr; } @@ -149,16 +157,20 @@ bool rmw_service_data_t::query_queue_is_empty() const } ///============================================================================= -void rmw_service_data_t::attach_condition(std::condition_variable * condition_variable) +void rmw_service_data_t::attach_condition( + std::mutex * condition_mutex, + std::condition_variable * condition_variable) { - std::lock_guard lock(condition_mutex_); + std::lock_guard lock(update_condition_mutex_); + condition_mutex_ = condition_mutex; condition_ = condition_variable; } ///============================================================================= void rmw_service_data_t::detach_condition() { - std::lock_guard lock(condition_mutex_); + std::lock_guard lock(update_condition_mutex_); + condition_mutex_ = nullptr; condition_ = nullptr; } @@ -179,8 +191,11 @@ std::unique_ptr rmw_service_data_t::pop_next_query() ///============================================================================= void rmw_service_data_t::notify() { - std::lock_guard lock(condition_mutex_); + std::lock_guard lock(update_condition_mutex_); if (condition_ != nullptr) { + // We also need to take the mutex for the condition_variable; see the comment + // in rmw_wait for more information + std::lock_guard cvlk(*condition_mutex_); condition_->notify_one(); } } @@ -208,31 +223,74 @@ void rmw_service_data_t::add_new_query(std::unique_ptr query) notify(); } +static size_t hash_gid(const rmw_request_id_t & request_id) +{ + std::stringstream hash_str; + hash_str << std::hex; + size_t i = 0; + for (; i < (RMW_GID_STORAGE_SIZE - 1); i++) { + hash_str << static_cast(request_id.writer_guid[i]); + } + return std::hash{}(hash_str.str()); +} + ///============================================================================= bool rmw_service_data_t::add_to_query_map( - int64_t sequence_number, std::unique_ptr query) + const rmw_request_id_t & request_id, std::unique_ptr query) { + size_t hash = hash_gid(request_id); + std::lock_guard lock(sequence_to_query_map_mutex_); - if (sequence_to_query_map_.find(sequence_number) != sequence_to_query_map_.end()) { - return false; + + std::unordered_map::iterator it = + sequence_to_query_map_.find(hash); + + if (it == sequence_to_query_map_.end()) { + SequenceToQuery stq; + + sequence_to_query_map_.insert(std::make_pair(hash, std::move(stq))); + + it = sequence_to_query_map_.find(hash); + } else { + // Client already in the map + + if (it->second.find(request_id.sequence_number) != it->second.end()) { + return false; + } } - sequence_to_query_map_.emplace( - std::pair(sequence_number, std::move(query))); + + it->second.insert( + std::make_pair(request_id.sequence_number, std::move(query))); return true; } ///============================================================================= -std::unique_ptr rmw_service_data_t::take_from_query_map(int64_t sequence_number) +std::unique_ptr rmw_service_data_t::take_from_query_map( + const rmw_request_id_t & request_id) { + size_t hash = hash_gid(request_id); + std::lock_guard lock(sequence_to_query_map_mutex_); - auto query_it = sequence_to_query_map_.find(sequence_number); - if (query_it == sequence_to_query_map_.end()) { + + std::unordered_map::iterator it = sequence_to_query_map_.find(hash); + + if (it == sequence_to_query_map_.end()) { + return nullptr; + } + + SequenceToQuery::iterator query_it = it->second.find(request_id.sequence_number); + + if (query_it == it->second.end()) { return nullptr; } std::unique_ptr query = std::move(query_it->second); - sequence_to_query_map_.erase(query_it); + it->second.erase(query_it); + + if (sequence_to_query_map_[hash].size() == 0) { + sequence_to_query_map_.erase(hash); + } return query; } @@ -240,8 +298,11 @@ std::unique_ptr rmw_service_data_t::take_from_query_map(int64_t sequ ///============================================================================= void rmw_client_data_t::notify() { - std::lock_guard lock(condition_mutex_); + std::lock_guard lock(update_condition_mutex_); if (condition_ != nullptr) { + // We also need to take the mutex for the condition_variable; see the comment + // in rmw_wait for more information + std::lock_guard cvlk(*condition_mutex_); condition_->notify_one(); } } @@ -278,16 +339,20 @@ bool rmw_client_data_t::reply_queue_is_empty() const } ///============================================================================= -void rmw_client_data_t::attach_condition(std::condition_variable * condition_variable) +void rmw_client_data_t::attach_condition( + std::mutex * condition_mutex, + std::condition_variable * condition_variable) { - std::lock_guard lock(condition_mutex_); + std::lock_guard lock(update_condition_mutex_); + condition_mutex_ = condition_mutex; condition_ = condition_variable; } ///============================================================================= void rmw_client_data_t::detach_condition() { - std::lock_guard lock(condition_mutex_); + std::lock_guard lock(update_condition_mutex_); + condition_mutex_ = nullptr; condition_ = nullptr; } diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 9a37eefc..0a307fd3 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -24,7 +24,6 @@ #include #include #include -#include #include #include #include @@ -173,7 +172,7 @@ class rmw_subscription_data_t final MessageTypeSupport * type_support; rmw_context_t * context; - void attach_condition(std::condition_variable * condition_variable); + void attach_condition(std::mutex * condition_mutex, std::condition_variable * condition_variable); void detach_condition(); @@ -192,8 +191,9 @@ class rmw_subscription_data_t final void notify(); + std::mutex * condition_mutex_{nullptr}; std::condition_variable * condition_{nullptr}; - std::mutex condition_mutex_; + std::mutex update_condition_mutex_; }; @@ -244,7 +244,7 @@ class rmw_service_data_t final bool query_queue_is_empty() const; - void attach_condition(std::condition_variable * condition_variable); + void attach_condition(std::mutex * condition_mutex, std::condition_variable * condition_variable); void detach_condition(); @@ -252,9 +252,9 @@ class rmw_service_data_t final void add_new_query(std::unique_ptr query); - bool add_to_query_map(int64_t sequence_number, std::unique_ptr query); + bool add_to_query_map(const rmw_request_id_t & request_id, std::unique_ptr query); - std::unique_ptr take_from_query_map(int64_t sequence_number); + std::unique_ptr take_from_query_map(const rmw_request_id_t & request_id); DataCallbackManager data_callback_mgr; @@ -265,12 +265,14 @@ class rmw_service_data_t final std::deque> query_queue_; mutable std::mutex query_queue_mutex_; - // Map to store the sequence_number -> query_id - std::unordered_map> sequence_to_query_map_; + // Map to store the sequence_number (as given by the client) -> ZenohQuery + using SequenceToQuery = std::unordered_map>; + std::unordered_map sequence_to_query_map_; std::mutex sequence_to_query_map_mutex_; + std::mutex * condition_mutex_{nullptr}; std::condition_variable * condition_{nullptr}; - std::mutex condition_mutex_; + std::mutex update_condition_mutex_; }; ///============================================================================= @@ -320,7 +322,7 @@ class rmw_client_data_t final bool reply_queue_is_empty() const; - void attach_condition(std::condition_variable * condition_variable); + void attach_condition(std::mutex * condition_mutex, std::condition_variable * condition_variable); void detach_condition(); @@ -334,8 +336,9 @@ class rmw_client_data_t final size_t sequence_number_{1}; std::mutex sequence_number_mutex_; + std::mutex * condition_mutex_{nullptr}; std::condition_variable * condition_{nullptr}; - std::mutex condition_mutex_; + std::mutex update_condition_mutex_; std::deque> reply_queue_; mutable std::mutex reply_queue_mutex_; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 08c90b87..d2245704 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -13,7 +13,6 @@ // limitations under the License. #include -#include #include @@ -29,6 +28,7 @@ #include #include "detail/attachment_helpers.hpp" +#include "detail/cdr.hpp" #include "detail/guard_condition.hpp" #include "detail/graph_cache.hpp" #include "detail/identifier.hpp" @@ -912,20 +912,17 @@ rmw_publish( eprosima::fastcdr::FastBuffer fastbuffer(msg_bytes, max_data_length); // Object that serializes the data - eprosima::fastcdr::Cdr ser( - fastbuffer, - eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, - eprosima::fastcdr::Cdr::DDS_CDR); + rmw_zenoh_cpp::Cdr ser(fastbuffer); if (!publisher_data->type_support->serialize_ros_message( ros_message, - ser, + ser.get_cdr(), publisher_data->type_support_impl)) { RMW_SET_ERROR_MSG("could not serialize ROS message"); return RMW_RET_ERROR; } - const size_t data_length = ser.getSerializedDataLength(); + const size_t data_length = ser.get_serialized_data_length(); int64_t sequence_number = publisher_data->get_next_sequence_number(); @@ -1056,9 +1053,8 @@ rmw_publish_serialized_message( eprosima::fastcdr::FastBuffer buffer( reinterpret_cast(serialized_message->buffer), serialized_message->buffer_length); - eprosima::fastcdr::Cdr ser( - buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR); - if (!ser.jump(serialized_message->buffer_length)) { + rmw_zenoh_cpp::Cdr ser(buffer); + if (!ser.get_cdr().jump(serialized_message->buffer_length)) { RMW_SET_ERROR_MSG("cannot correctly set serialized buffer"); return RMW_RET_ERROR; } @@ -1077,7 +1073,7 @@ rmw_publish_serialized_message( z_bytes_map_drop(z_move(map)); }); - const size_t data_length = ser.getSerializedDataLength(); + const size_t data_length = ser.get_serialized_data_length(); // The encoding is simply forwarded and is useful when key expressions in the // session use different encoding formats. In our case, all key expressions @@ -1161,10 +1157,9 @@ rmw_serialize( eprosima::fastcdr::FastBuffer buffer( reinterpret_cast(serialized_message->buffer), data_length); - eprosima::fastcdr::Cdr ser( - buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR); + rmw_zenoh_cpp::Cdr ser(buffer); - auto ret = tss.serialize_ros_message(ros_message, ser, callbacks); + auto ret = tss.serialize_ros_message(ros_message, ser.get_cdr(), callbacks); serialized_message->buffer_length = data_length; serialized_message->buffer_capacity = data_length; return ret == true ? RMW_RET_OK : RMW_RET_ERROR; @@ -1188,10 +1183,9 @@ rmw_deserialize( auto tss = MessageTypeSupport(callbacks); eprosima::fastcdr::FastBuffer buffer( reinterpret_cast(serialized_message->buffer), serialized_message->buffer_length); - eprosima::fastcdr::Cdr deser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, - eprosima::fastcdr::Cdr::DDS_CDR); + rmw_zenoh_cpp::Cdr deser(buffer); - auto ret = tss.deserialize_ros_message(deser, ros_message, callbacks); + auto ret = tss.deserialize_ros_message(deser.get_cdr(), ros_message, callbacks); return ret == true ? RMW_RET_OK : RMW_RET_ERROR; } @@ -1662,12 +1656,9 @@ static rmw_ret_t __rmw_take( msg_data->payload.payload.len); // Object that serializes the data - eprosima::fastcdr::Cdr deser( - fastbuffer, - eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, - eprosima::fastcdr::Cdr::DDS_CDR); + rmw_zenoh_cpp::Cdr deser(fastbuffer); if (!sub_data->type_support->deserialize_ros_message( - deser, + deser.get_cdr(), ros_message, sub_data->type_support_impl)) { @@ -2243,19 +2234,16 @@ rmw_send_request( eprosima::fastcdr::FastBuffer fastbuffer(request_bytes, max_data_length); // Object that serializes the data - eprosima::fastcdr::Cdr ser( - fastbuffer, - eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, - eprosima::fastcdr::Cdr::DDS_CDR); + rmw_zenoh_cpp::Cdr ser(fastbuffer); if (!client_data->request_type_support->serialize_ros_message( ros_request, - ser, + ser.get_cdr(), client_data->request_type_support_impl)) { return RMW_RET_ERROR; } - size_t data_length = ser.getSerializedDataLength(); + size_t data_length = ser.get_serialized_data_length(); *sequence_id = client_data->get_next_sequence_number(); @@ -2339,12 +2327,9 @@ rmw_take_response( sample->payload.len); // Object that serializes the data - eprosima::fastcdr::Cdr deser( - fastbuffer, - eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, - eprosima::fastcdr::Cdr::DDS_CDR); + rmw_zenoh_cpp::Cdr deser(fastbuffer); if (!client_data->response_type_support->deserialize_ros_message( - deser, + deser.get_cdr(), ros_response, client_data->response_type_support_impl)) { @@ -2788,12 +2773,9 @@ rmw_take_request( payload_value.payload.len); // Object that serializes the data - eprosima::fastcdr::Cdr deser( - fastbuffer, - eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, - eprosima::fastcdr::Cdr::DDS_CDR); + rmw_zenoh_cpp::Cdr deser(fastbuffer); if (!service_data->request_type_support->deserialize_ros_message( - deser, + deser.get_cdr(), ros_request, service_data->request_type_support_impl)) { @@ -2829,9 +2811,7 @@ rmw_take_request( request_header->received_timestamp = now_ns.count(); // Add this query to the map, so that rmw_send_response can quickly look it up later - if (!service_data->add_to_query_map( - request_header->request_id.sequence_number, std::move(query))) - { + if (!service_data->add_to_query_map(request_header->request_id, std::move(query))) { RMW_SET_ERROR_MSG("duplicate sequence number in the map"); return RMW_RET_ERROR; } @@ -2867,6 +2847,15 @@ rmw_send_response( rmw_service_data_t * service_data = static_cast(service->data); + // Create the queryable payload + std::unique_ptr query = + service_data->take_from_query_map(*request_header); + if (query == nullptr) { + // If there is no data associated with this request, the higher layers of + // ROS 2 seem to expect that we just silently return with no work. + return RMW_RET_OK; + } + rcutils_allocator_t * allocator = &(service_data->context->options.allocator); size_t max_data_length = ( @@ -2878,7 +2867,7 @@ rmw_send_response( max_data_length, allocator->state)); if (!response_bytes) { - RMW_SET_ERROR_MSG("failed allocate response message bytes"); + RMW_SET_ERROR_MSG("failed to allocate response message bytes"); return RMW_RET_ERROR; } auto free_response_bytes = rcpputils::make_scope_exit( @@ -2890,27 +2879,16 @@ rmw_send_response( eprosima::fastcdr::FastBuffer fastbuffer(response_bytes, max_data_length); // Object that serializes the data - eprosima::fastcdr::Cdr ser( - fastbuffer, - eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, - eprosima::fastcdr::Cdr::DDS_CDR); + rmw_zenoh_cpp::Cdr ser(fastbuffer); if (!service_data->response_type_support->serialize_ros_message( ros_response, - ser, + ser.get_cdr(), service_data->response_type_support_impl)) { return RMW_RET_ERROR; } - size_t data_length = ser.getSerializedDataLength(); - - // Create the queryable payload - std::unique_ptr query = - service_data->take_from_query_map(request_header->sequence_number); - if (query == nullptr) { - RMW_SET_ERROR_MSG("Unable to find taken request. Report this bug."); - return RMW_RET_ERROR; - } + size_t data_length = ser.get_serialized_data_length(); const z_query_t loaned_query = query->get_query(); z_query_reply_options_t options = z_query_reply_options_default(); @@ -3261,7 +3239,7 @@ rmw_wait( // rmw_guard_condition_t. So we can directly cast it to GuardCondition. GuardCondition * gc = static_cast(guard_conditions->guard_conditions[i]); if (gc != nullptr) { - gc->attach_condition(&wait_set_data->condition_variable); + gc->attach_condition(&wait_set_data->condition_mutex, &wait_set_data->condition_variable); } } } @@ -3272,7 +3250,9 @@ rmw_wait( for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { auto sub_data = static_cast(subscriptions->subscribers[i]); if (sub_data != nullptr) { - sub_data->attach_condition(&wait_set_data->condition_variable); + sub_data->attach_condition( + &wait_set_data->condition_mutex, + &wait_set_data->condition_variable); } } } @@ -3283,7 +3263,9 @@ rmw_wait( for (size_t i = 0; i < services->service_count; ++i) { auto serv_data = static_cast(services->services[i]); if (serv_data != nullptr) { - serv_data->attach_condition(&wait_set_data->condition_variable); + serv_data->attach_condition( + &wait_set_data->condition_mutex, + &wait_set_data->condition_variable); } } } @@ -3294,7 +3276,9 @@ rmw_wait( for (size_t i = 0; i < clients->client_count; ++i) { rmw_client_data_t * client_data = static_cast(clients->clients[i]); if (client_data != nullptr) { - client_data->attach_condition(&wait_set_data->condition_variable); + client_data->attach_condition( + &wait_set_data->condition_mutex, + &wait_set_data->condition_variable); } } } @@ -3308,6 +3292,7 @@ rmw_wait( if (zenoh_event_it != event_map.end()) { event_data->attach_event_condition( zenoh_event_it->second, + &wait_set_data->condition_mutex, &wait_set_data->condition_variable); } } @@ -3316,16 +3301,24 @@ rmw_wait( std::unique_lock lock(wait_set_data->condition_mutex); - // According to the RMW documentation, if wait_timeout is NULL that means - // "wait forever", if it specified by 0 it means "never wait", and if it is anything else wait - // for that amount of time. - if (wait_timeout == nullptr) { - wait_set_data->condition_variable.wait(lock); - } else { - if (wait_timeout->sec != 0 || wait_timeout->nsec != 0) { - std::cv_status wait_status = wait_set_data->condition_variable.wait_for( - lock, std::chrono::nanoseconds(wait_timeout->nsec + RCUTILS_S_TO_NS(wait_timeout->sec))); - wait_result = wait_status == std::cv_status::no_timeout; + // We have to check the triggered condition *again* under the lock so we + // don't miss notifications. + skip_wait = has_triggered_condition( + subscriptions, guard_conditions, services, clients, events); + + if (!skip_wait) { + // According to the RMW documentation, if wait_timeout is NULL that means + // "wait forever", if it specified by 0 it means "never wait", and if it is anything else wait + // for that amount of time. + if (wait_timeout == nullptr) { + wait_set_data->condition_variable.wait(lock); + } else { + if (wait_timeout->sec != 0 || wait_timeout->nsec != 0) { + std::cv_status wait_status = wait_set_data->condition_variable.wait_for( + lock, + std::chrono::nanoseconds(wait_timeout->nsec + RCUTILS_S_TO_NS(wait_timeout->sec))); + wait_result = wait_status == std::cv_status::no_timeout; + } } } } @@ -3438,7 +3431,7 @@ rmw_get_node_names( } //============================================================================== -/// Return the name, namespae, and enclave name of all nodes in the ROS graph. +/// Return the name, namespace, and enclave name of all nodes in the ROS graph. rmw_ret_t rmw_get_node_names_with_enclaves( const rmw_node_t * node,