diff --git a/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp b/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp index 56f7b49d..19d30327 100644 --- a/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp +++ b/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp @@ -87,7 +87,7 @@ zenoh::Bytes AttachmentData::serialize_to_zbytes() AttachmentData::AttachmentData(const zenoh::Bytes & bytes) { - zenoh::ext::Deserializer deserializer(std::move(bytes)); + zenoh::ext::Deserializer deserializer(bytes); const auto sequence_number_str = deserializer.deserialize(); if (sequence_number_str != "sequence_number") { throw std::runtime_error("sequence_number is not found in the attachment."); diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp index e3e63b4a..98522613 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -141,7 +141,7 @@ std::shared_ptr ClientData::make( response_type_support }); - if (!client_data->init(session)) { + if (!client_data->init()) { // init() already set the error. return nullptr; } @@ -176,14 +176,13 @@ ClientData::ClientData( } ///============================================================================= -bool ClientData::init(const std::shared_ptr session) +bool ClientData::init() { std::string topic_keyexpr = this->entity_->topic_info().value().topic_keyexpr_; keyexpr_ = zenoh::KeyExpr(topic_keyexpr); - // TODO(ahcorde) check KeyExpr std::string liveliness_keyexpr = this->entity_->liveliness_keyexpr(); zenoh::ZResult result; - this->token_ = session->liveliness_declare_token( + this->token_ = sess_->liveliness_declare_token( zenoh::KeyExpr(liveliness_keyexpr), zenoh::Session::LivelinessDeclarationOptions::create_default(), &result); @@ -194,7 +193,6 @@ bool ClientData::init(const std::shared_ptr session) return false; } - sess_ = session; initialized_ = true; return true; diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp index fbe75c30..d31ac343 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp @@ -115,7 +115,7 @@ class ClientData final : public std::enable_shared_from_this std::shared_ptr response_type_support); // Initialize the Zenoh objects for this entity. - bool init(const std::shared_ptr session); + bool init(); // Internal mutex. mutable std::recursive_mutex mutex_; diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index 36257d21..327499e3 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -204,7 +204,7 @@ class rmw_context_impl_s::Data final zenoh::Session::LivelinessSubscriberOptions sub_options = zenoh::Session::LivelinessSubscriberOptions::create_default(); sub_options.history = true; - graph_subscriber_cpp_ = session_->liveliness_declare_subscriber( + graph_subscriber_ = session_->liveliness_declare_subscriber( keyexpr_cpp, [&](const zenoh::Sample & sample) { // Look up the data shared_ptr in the global map. If it is in there, use it. @@ -244,7 +244,7 @@ class rmw_context_impl_s::Data final } zenoh::ZResult result; - std::move(graph_subscriber_cpp_).value().undeclare(&result); + std::move(graph_subscriber_).value().undeclare(&result); if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -418,7 +418,16 @@ class rmw_context_impl_s::Data final // Graph cache. std::shared_ptr graph_cache_; // ROS graph liveliness subscriber. - std::optional> graph_subscriber_cpp_; + // The graph_subscriber *must* exist in order for anything in this Data class, + // and hence rmw_zenoh_cpp, to work. + // However, zenoh::Subscriber does not have an empty constructor, + // so just declaring this as a zenoh::Subscriber fails to compile. + // We work around that by wrapping it in a std::optional, + // so the std::optional gets constructed at Data constructor time, + // and then we initialize graph_subscriber_ later. Note that the zenoh-cpp API + // liveliness_declare_subscriber() throws an exception if it fails, + // so this should all be safe to do. + std::optional> graph_subscriber_; // Equivalent to rmw_dds_common::Context's guard condition. // Guard condition that should be triggered when the graph changes. std::unique_ptr graph_guard_condition_; diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index 219b466d..dbba7581 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -226,6 +226,7 @@ rmw_ret_t PublisherData::publish( } }); + // TODO(anyone): Move this zenoh_cpp API // Get memory from SHM buffer if available. // if (shm_provider.has_value()) { // RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled."); @@ -280,10 +281,10 @@ rmw_ret_t PublisherData::publish( entity_->copy_gid()); // TODO(ahcorde): shmbuf - std::vector raw_image( + std::vector raw_data( reinterpret_cast(msg_bytes), reinterpret_cast(msg_bytes) + data_length); - zenoh::Bytes payload(raw_image); + zenoh::Bytes payload(std::move(raw_data)); pub_.put(std::move(payload), std::move(options), &result); if (result != Z_OK) { @@ -323,10 +324,10 @@ rmw_ret_t PublisherData::publish_serialized_message( auto options = zenoh::Publisher::PutOptions::create_default(); options.attachment = create_map_and_set_sequence_num(sequence_number_++, entity_->copy_gid()); - std::vector raw_image( + std::vector raw_data( serialized_message->buffer, serialized_message->buffer + data_length); - zenoh::Bytes payload(raw_image); + zenoh::Bytes payload(std::move(raw_data)); pub_.put(std::move(payload), std::move(options), &result); if (result != Z_OK) { diff --git a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp index d24c965f..30954901 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp @@ -326,9 +326,10 @@ rmw_ret_t ServiceData::take_request( return RMW_RET_ERROR; } + auto writter_gid_v = attachment.copy_gid(); memcpy( request_header->request_id.writer_guid, - attachment.copy_gid().data(), + writter_gid_v.data(), RMW_GID_STORAGE_SIZE); request_header->source_timestamp = attachment.source_timestamp(); @@ -339,7 +340,7 @@ rmw_ret_t ServiceData::take_request( request_header->received_timestamp = query->get_received_timestamp(); // Add this query to the map, so that rmw_send_response can quickly look it up later. - const size_t hash = rmw_zenoh_cpp::hash_gid_p(request_header->request_id.writer_guid); + const size_t hash = rmw_zenoh_cpp::hash_gid(writter_gid_v); std::unordered_map::iterator it = sequence_to_query_map_.find(hash); if (it == sequence_to_query_map_.end()) { SequenceToQuery stq; @@ -378,8 +379,13 @@ rmw_ret_t ServiceData::send_response( ); return RMW_RET_OK; } + + std::vector writer_guid; + writer_guid.resize(RMW_GID_STORAGE_SIZE); + memcpy(writer_guid.data(), request_id->writer_guid, RMW_GID_STORAGE_SIZE); + // Create the queryable payload - const size_t hash = hash_gid_p(request_id->writer_guid); + const size_t hash = hash_gid(writer_guid); std::unordered_map::iterator it = sequence_to_query_map_.find(hash); if (it == sequence_to_query_map_.end()) { // If there is no data associated with this request, the higher layers of diff --git a/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp index 53a77830..eeeeee6c 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp @@ -117,8 +117,24 @@ class ServiceData final : public std::enable_shared_from_this // The keyexpr string. std::string keyexpr_; // An owned queryable. + // The Queryable *must* exist in order for anything in this ServiceData class, + // and hence rmw_zenoh_cpp, to work. + // However, zenoh::Queryable does not have an empty constructor, + // so just declaring this as a zenoh::Queryable fails to compile. + // We work around that by wrapping it in a std::optional, so the std::optional + // gets constructed at ServiceData constructor time, + // and then we initialize qable_ later. Note that the zenoh-cpp API declare_queryable() throws an + // exception if it fails, so this should all be safe to do. std::optional> qable_; // Liveliness token for the service. + // The token_ *must* exist in order for anything in this ServiceData class, + // and hence rmw_zenoh_cpp, to work. + // However, zenoh::LivelinessToken does not have an empty constructor, + // so just declaring this as a zenoh::LivelinessToken fails to compile. + // We work around that by wrapping it in a std::optional, so the std::optional + // gets constructed at ServiceData constructor time, + // and then we initialize token_ later. Note that the zenoh-cpp API + // liveliness_declare_token() throws an exception if it fails, so this should all be safe to do. std::optional token_; // Type support fields. const void * request_type_support_impl_;