From 7ea6a70453f4f3bbc2e4bb8eda93cf1057d9e2bd Mon Sep 17 00:00:00 2001 From: Yadunund Date: Tue, 30 Jan 2024 23:03:28 +0800 Subject: [PATCH] Set callbacks Signed-off-by: Yadunund --- .github/workflows/style.yaml | 4 +- rmw_zenoh_cpp/src/detail/event.hpp | 52 ++++++ rmw_zenoh_cpp/src/detail/graph_cache.hpp | 1 + rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 171 +++++++++++++++++++- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 65 ++++++-- rmw_zenoh_cpp/src/rmw_event.cpp | 97 +++++++++-- rmw_zenoh_cpp/src/rmw_qos.cpp | 1 - rmw_zenoh_cpp/src/rmw_zenoh.cpp | 74 +++++---- 8 files changed, 404 insertions(+), 61 deletions(-) create mode 100644 rmw_zenoh_cpp/src/detail/event.hpp diff --git a/.github/workflows/style.yaml b/.github/workflows/style.yaml index a2a0cb33..5c3a7160 100644 --- a/.github/workflows/style.yaml +++ b/.github/workflows/style.yaml @@ -17,5 +17,7 @@ jobs: timeout-minutes: 30 steps: - uses: actions/checkout@v2 - - name: style + - name: uncrustify run: /ros_entrypoint.sh ament_uncrustify rmw_zenoh_cpp/ + - name: cpplint + run: /ros_entrypoint.sh ament_cpplint rmw_zenoh_cpp/ diff --git a/rmw_zenoh_cpp/src/detail/event.hpp b/rmw_zenoh_cpp/src/detail/event.hpp new file mode 100644 index 00000000..34f55826 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/event.hpp @@ -0,0 +1,52 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__EVENT_HPP_ +#define DETAIL__EVENT_HPP_ + +#include + +#include "rmw/event.h" + +///============================================================================= +// RMW Event types that we support in rmw_zenoh. +enum rmw_zenoh_event_type_t +{ + // sentinel value + ZENOH_EVENT_INVALID, + + // subscription events + ZENOH_EVENT_REQUESTED_QOS_INCOMPATIBLE, + // RMW_EVENT_MESSAGE_LOST, + // RMW_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE, + // RMW_EVENT_SUBSCRIPTION_MATCHED, + + // publisher events + // RMW_EVENT_LIVELINESS_LOST, + // RMW_EVENT_OFFERED_DEADLINE_MISSED, + ZENOH_EVENT_OFFERED_QOS_INCOMPATIBLE + // RMW_EVENT_PUBLISHER_INCOMPATIBLE_TYPE, + // RMW_EVENT_PUBLICATION_MATCHED, +}; + +/// Helper value to indicate the maximum index of events supported. +#define ZENOH_EVENT_ID_MAX rmw_zenoh_event_type_t::ZENOH_EVENT_OFFERED_QOS_INCOMPATIBLE + +static const std::unordered_map event_map{ + {RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE, ZENOH_EVENT_REQUESTED_QOS_INCOMPATIBLE}, + {RMW_EVENT_OFFERED_QOS_INCOMPATIBLE, ZENOH_EVENT_OFFERED_QOS_INCOMPATIBLE}, + // TODO(clalancette): Implement remaining events +}; + +#endif // DETAIL__EVENT_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index 6add0f7c..d0a6883f 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -23,6 +23,7 @@ #include #include +#include "event.hpp" #include "liveliness_utils.hpp" #include "rcutils/allocator.h" diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 7effa150..8f604a02 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -22,21 +22,51 @@ #include "rcpputils/scope_exit.hpp" #include "rcutils/logging_macros.h" +#include "rmw/error_handling.h" + #include "rmw_data_types.hpp" -///============================================================================== +///============================================================================= saved_msg_data::saved_msg_data(zc_owned_payload_t p, uint64_t recv_ts, const uint8_t pub_gid[16]) : payload(p), recv_timestamp(recv_ts) { memcpy(publisher_gid, pub_gid, 16); } +///============================================================================= +void rmw_publisher_data_t::event_set_callback( + rmw_zenoh_event_type_t event_id, + rmw_event_callback_t callback, + const void * user_data) +{ + if (event_id > ZENOH_EVENT_ID_MAX) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.", event_id); + return; + } + + std::lock_guard lock(user_callback_data_.mutex); + + // Set the user callback data + user_callback_data_.event_callback[event_id] = callback; + user_callback_data_.event_data[event_id] = user_data; + + if (callback && user_callback_data_.event_unread_count[event_id]) { + // Push events happened before having assigned a callback + callback(user_data, user_callback_data_.event_unread_count[event_id]); + user_callback_data_.event_unread_count[event_id] = 0; + } + return; +} + +///============================================================================= void rmw_subscription_data_t::attach_condition(std::condition_variable * condition_variable) { std::lock_guard lock(condition_mutex_); condition_ = condition_variable; } +///============================================================================= void rmw_subscription_data_t::notify() { std::lock_guard lock(condition_mutex_); @@ -45,18 +75,21 @@ void rmw_subscription_data_t::notify() } } +///============================================================================= void rmw_subscription_data_t::detach_condition() { std::lock_guard lock(condition_mutex_); condition_ = nullptr; } +///============================================================================= bool rmw_subscription_data_t::message_queue_is_empty() const { std::lock_guard lock(message_queue_mutex_); return message_queue_.empty(); } +///============================================================================= std::unique_ptr rmw_subscription_data_t::pop_next_message() { std::lock_guard lock(message_queue_mutex_); @@ -72,6 +105,7 @@ std::unique_ptr rmw_subscription_data_t::pop_next_message() return msg_data; } +///============================================================================= void rmw_subscription_data_t::add_new_message( std::unique_ptr msg, const std::string & topic_name) { @@ -95,33 +129,91 @@ void rmw_subscription_data_t::add_new_message( z_drop(z_move(old->payload)); message_queue_.pop_front(); } - } message_queue_.emplace_back(std::move(msg)); + // Trigger the user provided event callback if available. + std::unique_lock lock_event_mutex(user_callback_data_.mutex); + if (user_callback_data_.callback != nullptr) { + user_callback_data_.callback(user_callback_data_.user_data, 1); + } else { + ++user_callback_data_.unread_count; + } + lock_event_mutex.unlock(); + // Since we added new data, trigger the guard condition if it is available notify(); } +///============================================================================= +void rmw_subscription_data_t::set_on_new_message_callback( + const void * user_data, rmw_event_callback_t callback) +{ + std::lock_guard lock_mutex(user_callback_data_.mutex); + + if (callback) { + // Push events arrived before setting the the executor callback. + if (user_callback_data_.unread_count) { + callback(user_data, user_callback_data_.unread_count); + user_callback_data_.unread_count = 0; + } + user_callback_data_.user_data = user_data; + user_callback_data_.callback = callback; + } else { + user_callback_data_.user_data = nullptr; + user_callback_data_.callback = nullptr; + } +} + +///============================================================================= +void rmw_subscription_data_t::event_set_callback( + rmw_zenoh_event_type_t event_id, + rmw_event_callback_t callback, + const void * user_data) +{ + if (event_id > ZENOH_EVENT_ID_MAX) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d].Report this bug.", event_id); + return; + } + + std::lock_guard lock(user_callback_data_.mutex); + + // Set the user callback data + user_callback_data_.event_callback[event_id] = callback; + user_callback_data_.event_data[event_id] = user_data; + + if (callback && user_callback_data_.event_unread_count[event_id]) { + // Push events happened before having assigned a callback + callback(user_data, user_callback_data_.event_unread_count[event_id]); + user_callback_data_.event_unread_count[event_id] = 0; + } + return; +} + +///============================================================================= bool rmw_service_data_t::query_queue_is_empty() const { std::lock_guard lock(query_queue_mutex_); return query_queue_.empty(); } +///============================================================================= void rmw_service_data_t::attach_condition(std::condition_variable * condition_variable) { std::lock_guard lock(condition_mutex_); condition_ = condition_variable; } +///============================================================================= void rmw_service_data_t::detach_condition() { std::lock_guard lock(condition_mutex_); condition_ = nullptr; } +///============================================================================= std::unique_ptr rmw_service_data_t::pop_next_query() { std::lock_guard lock(query_queue_mutex_); @@ -135,6 +227,7 @@ std::unique_ptr rmw_service_data_t::pop_next_query() return query; } +///============================================================================= void rmw_service_data_t::notify() { std::lock_guard lock(condition_mutex_); @@ -143,15 +236,26 @@ void rmw_service_data_t::notify() } } +///============================================================================= void rmw_service_data_t::add_new_query(std::unique_ptr query) { std::lock_guard lock(query_queue_mutex_); query_queue_.emplace_back(std::move(query)); + // Trigger the user provided event callback if available. + std::unique_lock lock_event_mutex(user_callback_data_.mutex); + if (user_callback_data_.callback != nullptr) { + user_callback_data_.callback(user_callback_data_.user_data, 1); + } else { + ++user_callback_data_.unread_count; + } + lock_event_mutex.unlock(); + // Since we added new data, trigger the guard condition if it is available notify(); } +///============================================================================= bool rmw_service_data_t::add_to_query_map( int64_t sequence_number, std::unique_ptr query) { @@ -165,6 +269,7 @@ bool rmw_service_data_t::add_to_query_map( return true; } +///============================================================================= std::unique_ptr rmw_service_data_t::take_from_query_map(int64_t sequence_number) { std::lock_guard lock(sequence_to_query_map_mutex_); @@ -179,6 +284,27 @@ std::unique_ptr rmw_service_data_t::take_from_query_map(int64_t sequ return query; } +///============================================================================= +void rmw_service_data_t::set_on_new_request_callback( + const void * user_data, rmw_event_callback_t callback) +{ + std::lock_guard lock_mutex(user_callback_data_.mutex); + + if (callback) { + // Push events arrived before setting the the executor callback. + if (user_callback_data_.unread_count) { + callback(user_data, user_callback_data_.unread_count); + user_callback_data_.unread_count = 0; + } + user_callback_data_.user_data = user_data; + user_callback_data_.callback = callback; + } else { + user_callback_data_.user_data = nullptr; + user_callback_data_.callback = nullptr; + } +} + +///============================================================================= void rmw_client_data_t::notify() { std::lock_guard lock(condition_mutex_); @@ -187,14 +313,25 @@ void rmw_client_data_t::notify() } } +///============================================================================= void rmw_client_data_t::add_new_reply(std::unique_ptr reply) { std::lock_guard lock(reply_queue_mutex_); reply_queue_.emplace_back(std::move(reply)); + // Trigger the user provided event callback if available. + std::unique_lock lock_event_mutex(user_callback_data_.mutex); + if (user_callback_data_.callback != nullptr) { + user_callback_data_.callback(user_callback_data_.user_data, 1); + } else { + ++user_callback_data_.unread_count; + } + lock_event_mutex.unlock(); + notify(); } +///============================================================================= bool rmw_client_data_t::reply_queue_is_empty() const { std::lock_guard lock(reply_queue_mutex_); @@ -202,18 +339,21 @@ bool rmw_client_data_t::reply_queue_is_empty() const return reply_queue_.empty(); } +///============================================================================= void rmw_client_data_t::attach_condition(std::condition_variable * condition_variable) { std::lock_guard lock(condition_mutex_); condition_ = condition_variable; } +///============================================================================= void rmw_client_data_t::detach_condition() { std::lock_guard lock(condition_mutex_); condition_ = nullptr; } +///============================================================================= std::unique_ptr rmw_client_data_t::pop_next_reply() { std::lock_guard lock(reply_queue_mutex_); @@ -228,6 +368,26 @@ std::unique_ptr rmw_client_data_t::pop_next_reply() return latest_reply; } +///============================================================================= +void rmw_client_data_t::set_on_new_response_callback( + const void * user_data, rmw_event_callback_t callback) +{ + std::lock_guard lock_mutex(user_callback_data_.mutex); + + if (callback) { + // Push events arrived before setting the the executor callback. + if (user_callback_data_.unread_count) { + callback(user_data, user_callback_data_.unread_count); + user_callback_data_.unread_count = 0; + } + user_callback_data_.user_data = user_data; + user_callback_data_.callback = callback; + } else { + user_callback_data_.user_data = nullptr; + user_callback_data_.callback = nullptr; + } +} + //============================================================================== void sub_data_handler( const z_sample_t * sample, @@ -256,16 +416,19 @@ void sub_data_handler( sample->timestamp.time, sample->timestamp.id.id), z_loan(keystr)); } +///============================================================================= ZenohQuery::ZenohQuery(const z_query_t * query) { query_ = z_query_clone(query); } +///============================================================================= ZenohQuery::~ZenohQuery() { z_drop(z_move(query_)); } +///============================================================================= const z_query_t ZenohQuery::get_query() const { return z_query_loan(&query_); @@ -294,16 +457,19 @@ void service_data_handler(const z_query_t * query, void * data) service_data->add_new_query(std::make_unique(query)); } +///============================================================================= ZenohReply::ZenohReply(const z_owned_reply_t * reply) { reply_ = *reply; } +///============================================================================= ZenohReply::~ZenohReply() { z_reply_drop(z_move(reply_)); } +///============================================================================= std::optional ZenohReply::get_sample() const { if (z_reply_is_ok(&reply_)) { @@ -313,6 +479,7 @@ std::optional ZenohReply::get_sample() const return std::nullopt; } +///============================================================================= size_t rmw_client_data_t::get_next_sequence_number() { std::lock_guard lock(sequence_number_mutex); diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 0100211f..b6e56daf 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -31,6 +31,7 @@ #include "rcutils/allocator.h" +#include "rmw/event_callback_type.h" #include "rmw/rmw.h" #include "graph_cache.hpp" @@ -39,7 +40,19 @@ /// Structs for various type erased data fields. -///============================================================================== +///============================================================================= +struct user_callback_data_t +{ + std::mutex mutex; + rmw_event_callback_t callback {nullptr}; + const void * user_data {nullptr}; + size_t unread_count {0}; + rmw_event_callback_t event_callback[ZENOH_EVENT_ID_MAX + 1] {nullptr}; + const void * event_data[ZENOH_EVENT_ID_MAX + 1] {nullptr}; + size_t event_unread_count[ZENOH_EVENT_ID_MAX + 1] {0}; +}; + +///============================================================================= struct rmw_context_impl_s { // An owned session. @@ -61,7 +74,7 @@ struct rmw_context_impl_s GraphCache graph_cache; }; -///============================================================================== +///============================================================================= struct rmw_node_data_t { // TODO(Yadunund): Do we need a token at the node level? Right now I have one @@ -70,9 +83,10 @@ struct rmw_node_data_t zc_owned_liveliness_token_t token; }; -///============================================================================== -struct rmw_publisher_data_t +///============================================================================= +class rmw_publisher_data_t { +public: // An owned publisher. z_owned_publisher_t pub; @@ -92,9 +106,17 @@ struct rmw_publisher_data_t // Context for memory allocation for messages. rmw_context_t * context; + + void event_set_callback( + rmw_zenoh_event_type_t event_id, + rmw_event_callback_t callback, + const void * user_data); + +private: + user_callback_data_t user_callback_data_; }; -///============================================================================== +///============================================================================= struct rmw_wait_set_data_t { std::condition_variable condition_variable; @@ -103,7 +125,7 @@ struct rmw_wait_set_data_t rmw_context_t * context; }; -///============================================================================== +///============================================================================= // z_owned_closure_sample_t void sub_data_handler(const z_sample_t * sample, void * sub_data); @@ -116,7 +138,7 @@ struct saved_msg_data uint8_t publisher_gid[16]; }; -///============================================================================== +///============================================================================= class rmw_subscription_data_t final { public: @@ -144,6 +166,13 @@ class rmw_subscription_data_t final void add_new_message(std::unique_ptr msg, const std::string & topic_name); + void set_on_new_message_callback(const void * user_data, rmw_event_callback_t callback); + + void event_set_callback( + rmw_zenoh_event_type_t event_id, + rmw_event_callback_t callback, + const void * user_data); + private: std::deque> message_queue_; mutable std::mutex message_queue_mutex_; @@ -152,17 +181,18 @@ class rmw_subscription_data_t final std::condition_variable * condition_{nullptr}; std::mutex condition_mutex_; -}; + user_callback_data_t user_callback_data_; +}; -///============================================================================== +///============================================================================= void service_data_handler(const z_query_t * query, void * service_data); +///============================================================================= void client_data_handler(z_owned_reply_t * reply, void * client_data); -///============================================================================== - +///============================================================================= class ZenohQuery final { public: @@ -176,6 +206,7 @@ class ZenohQuery final z_owned_query_t query_; }; +///============================================================================= class rmw_service_data_t final { public: @@ -211,6 +242,8 @@ class rmw_service_data_t final std::unique_ptr take_from_query_map(int64_t sequence_number); + void set_on_new_request_callback(const void * user_data, rmw_event_callback_t callback); + private: void notify(); @@ -224,10 +257,11 @@ class rmw_service_data_t final std::condition_variable * condition_{nullptr}; std::mutex condition_mutex_; -}; -///============================================================================== + user_callback_data_t user_callback_data_; +}; +///============================================================================= class ZenohReply final { public: @@ -241,6 +275,7 @@ class ZenohReply final z_owned_reply_t reply_; }; +///============================================================================= class rmw_client_data_t final { public: @@ -276,6 +311,8 @@ class rmw_client_data_t final std::unique_ptr pop_next_reply(); + void set_on_new_response_callback(const void * user_data, rmw_event_callback_t callback); + private: void notify(); @@ -287,6 +324,8 @@ class rmw_client_data_t final std::deque> reply_queue_; mutable std::mutex reply_queue_mutex_; + + user_callback_data_t user_callback_data_; }; #endif // DETAIL__RMW_DATA_TYPES_HPP_ diff --git a/rmw_zenoh_cpp/src/rmw_event.cpp b/rmw_zenoh_cpp/src/rmw_event.cpp index e7e44a4b..bbfcc253 100644 --- a/rmw_zenoh_cpp/src/rmw_event.cpp +++ b/rmw_zenoh_cpp/src/rmw_event.cpp @@ -12,22 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include - #include "rmw/error_handling.h" #include "rmw/event.h" #include "rmw/types.h" +#include "detail/event.hpp" #include "detail/identifier.hpp" +#include "detail/rmw_data_types.hpp" extern "C" { -static const std::unordered_map event_map{ - // TODO(clalancette): Implement some events -}; - ///============================================================================== -/// Initialize a rmw subscription event +/// Initialize a rmw publisher event rmw_ret_t rmw_publisher_event_init( rmw_event_t * rmw_event, @@ -38,6 +34,8 @@ rmw_publisher_event_init( RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(publisher->implementation_identifier, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(publisher->data, RMW_RET_INVALID_ARGUMENT); + rmw_publisher_data_t * pub_data = static_cast(publisher->data); + RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); if (publisher->implementation_identifier != rmw_zenoh_identifier) { RMW_SET_ERROR_MSG("Publisher implementation identifier not from this implementation"); @@ -51,7 +49,7 @@ rmw_publisher_event_init( } rmw_event->implementation_identifier = publisher->implementation_identifier; - rmw_event->data = publisher->data; + rmw_event->data = pub_data; rmw_event->event_type = event_type; return RMW_RET_OK; @@ -69,6 +67,8 @@ rmw_subscription_event_init( RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(subscription->implementation_identifier, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(subscription->data, RMW_RET_INVALID_ARGUMENT); + rmw_subscription_data_t * sub_data = static_cast(subscription->data); + RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); if (subscription->implementation_identifier != rmw_zenoh_identifier) { RMW_SET_ERROR_MSG( @@ -83,12 +83,56 @@ rmw_subscription_event_init( } rmw_event->implementation_identifier = subscription->implementation_identifier; - rmw_event->data = subscription->data; + rmw_event->data = sub_data; rmw_event->event_type = event_type; return RMW_RET_OK; } +//============================================================================== +/// Set the callback function for the event. +rmw_ret_t +rmw_event_set_callback( + rmw_event_t * rmw_event, + rmw_event_callback_t callback, + const void * user_data) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(rmw_event, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(rmw_event->data, RMW_RET_INVALID_ARGUMENT); + + auto zenoh_event_it = event_map.find(rmw_event->event_type); + if (zenoh_event_it == event_map.end()) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("RMW Zenoh does not support event [%d]", rmw_event->event_type); + return RMW_RET_ERROR; + } + + switch (zenoh_event_it->second) { + case ZENOH_EVENT_REQUESTED_QOS_INCOMPATIBLE: { + rmw_subscription_data_t * sub_data = static_cast(rmw_event->data); + RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); + sub_data->event_set_callback( + zenoh_event_it->second, + callback, + user_data); + break; + } + case ZENOH_EVENT_OFFERED_QOS_INCOMPATIBLE: { + rmw_publisher_data_t * pub_data = static_cast(rmw_event->data); + RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); + pub_data->event_set_callback( + zenoh_event_it->second, + callback, + user_data); + break; + } + default: { + return RMW_RET_INVALID_ARGUMENT; + } + } + + return RMW_RET_OK; +} + ///============================================================================== rmw_ret_t rmw_take_event( @@ -100,20 +144,43 @@ rmw_take_event( RMW_CHECK_ARGUMENT_FOR_NULL(event_info, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); + *taken = false; + if (event_handle->implementation_identifier != rmw_zenoh_identifier) { RMW_SET_ERROR_MSG( "Event implementation identifier not from this implementation"); return RMW_RET_INCORRECT_RMW_IMPLEMENTATION; } - switch (event_handle->event_type) { - case RMW_EVENT_INVALID: - break; - default: - break; + auto zenoh_event_it = event_map.find(event_handle->event_type); + if (zenoh_event_it == event_map.end()) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("RMW Zenoh does not support event [%d]", event_handle->event_type); + return RMW_RET_ERROR; + } + + switch (zenoh_event_it->second) { + case ZENOH_EVENT_REQUESTED_QOS_INCOMPATIBLE: { + rmw_subscription_data_t * sub_data = static_cast(event_handle->data); + RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); + auto ei = static_cast(event_info); + ei->total_count = 0; + ei->total_count_change = 0; + *taken = true; + return RMW_RET_OK; + } + case ZENOH_EVENT_OFFERED_QOS_INCOMPATIBLE: { + rmw_publisher_data_t * pub_data = static_cast(event_handle->data); + auto ei = static_cast(event_info); + ei->total_count = 0; + ei->total_count_change = 0; + *taken = true; + return RMW_RET_OK; + } + default: { + return RMW_RET_INVALID_ARGUMENT; + } } - *taken = false; return RMW_RET_ERROR; } } // extern "C" diff --git a/rmw_zenoh_cpp/src/rmw_qos.cpp b/rmw_zenoh_cpp/src/rmw_qos.cpp index 2cb42577..0dcf6a4d 100644 --- a/rmw_zenoh_cpp/src/rmw_qos.cpp +++ b/rmw_zenoh_cpp/src/rmw_qos.cpp @@ -28,7 +28,6 @@ rmw_qos_profile_check_compatible( char * reason, size_t reason_size) { - // In Zenoh, publishers do not have any reliability settings. // A publisher and subscription are only incompatible if the durability of the publisher is // TRANSIENT_LOCAL but that of the subscription is not. In such a scenario, a late-joining diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index c224e391..00614b4b 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1390,9 +1390,8 @@ rmw_create_subscription( RMW_SET_ERROR_MSG("unable to create zenoh subscription"); return nullptr; } - } - // Create a regular subscriber for all other durability settings. - else { + } else { + // Create a regular subscriber for all other durability settings. z_subscriber_options_t sub_options = z_subscriber_options_default(); if (qos_profile->reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { sub_options.reliability = Z_RELIABILITY_RELIABLE; @@ -3176,6 +3175,17 @@ static bool has_triggered_condition( } // TODO(clalancette): Deal with events + // if (events) { + // for (size_t i = 0; i < events->event_count; ++i) { + // auto event = static_cast(events->events[i]); + // auto custom_event_info = static_cast(event->data); + // if (custom_event_info->get_listener()->get_statuscondition().get_trigger_value() || + // custom_event_info->get_listener()->get_event_guard(event->event_type).get_trigger_value()) + // { + // return true; + // } + // } + // } if (subscriptions) { for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { @@ -3305,6 +3315,17 @@ rmw_wait( } } + // if (events) { + // for (size_t i = 0; i < events->event_count; ++i) { + // auto event = static_cast(events->events[i]); + // auto custom_event_info = static_cast(event->data); + // attached_conditions.push_back( + // &custom_event_info->get_listener()->get_statuscondition()); + // attached_conditions.push_back( + // &custom_event_info->get_listener()->get_event_guard(event->event_type)); + // } + // } + std::unique_lock lock(wait_set_data->condition_mutex); // According to the RMW documentation, if wait_timeout is NULL that means @@ -3641,10 +3662,13 @@ rmw_subscription_set_on_new_message_callback( rmw_event_callback_t callback, const void * user_data) { - static_cast(subscription); - static_cast(callback); - static_cast(user_data); - return RMW_RET_UNSUPPORTED; + RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); + rmw_subscription_data_t * sub_data = + static_cast(subscription->data); + RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); + sub_data->set_on_new_message_callback( + user_data, callback); + return RMW_RET_OK; } //============================================================================== @@ -3655,10 +3679,13 @@ rmw_service_set_on_new_request_callback( rmw_event_callback_t callback, const void * user_data) { - static_cast(service); - static_cast(callback); - static_cast(user_data); - return RMW_RET_UNSUPPORTED; + RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); + rmw_service_data_t * service_data = + static_cast(service->data); + RMW_CHECK_ARGUMENT_FOR_NULL(service_data, RMW_RET_INVALID_ARGUMENT); + service_data->set_on_new_request_callback( + user_data, callback); + return RMW_RET_OK; } //============================================================================== @@ -3669,23 +3696,12 @@ rmw_client_set_on_new_response_callback( rmw_event_callback_t callback, const void * user_data) { - static_cast(client); - static_cast(callback); - static_cast(user_data); - return RMW_RET_UNSUPPORTED; -} - -//============================================================================== -/// Set the callback function for the event. -rmw_ret_t -rmw_event_set_callback( - rmw_event_t * event, - rmw_event_callback_t callback, - const void * user_data) -{ - static_cast(event); - static_cast(callback); - static_cast(user_data); - return RMW_RET_UNSUPPORTED; + RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); + rmw_client_data_t * client_data = + static_cast(client->data); + RMW_CHECK_ARGUMENT_FOR_NULL(client_data, RMW_RET_INVALID_ARGUMENT); + client_data->set_on_new_response_callback( + user_data, callback); + return RMW_RET_OK; } } // extern "C"