diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 0458bfd0..8bb24ed2 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -70,3 +70,98 @@ void sub_data_handler( z_drop(z_move(keystr)); } + +void service_data_handler(const z_query_t * query, void * service_data) +{ + RCUTILS_LOG_INFO_NAMED( + "rmw_zenoh_cpp", + "[service_data_handler] triggered" + ); + z_owned_str_t keystr = z_keyexpr_to_string(z_query_keyexpr(query)); + + auto rmw_service_data = static_cast(service_data); + if (rmw_service_data == nullptr) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain rmw_service_data_t from data for " + "service for %s", + z_loan(keystr) + ); + return; + } + + // Get the query parameters and payload + { + std::lock_guard lock(rmw_service_data->query_queue_mutex); + + const unsigned int client_id = rmw_service_data->get_new_uid(); + rmw_service_data->id_query_map.emplace( + std::make_pair(client_id, std::make_unique(z_query_clone(query)))); + rmw_service_data->to_take.push_back(client_id); + + + // Since we added new data, trigger the guard condition if it is available + std::lock_guard internal_lock(rmw_service_data->internal_mutex); + if (rmw_service_data->condition != nullptr) { + rmw_service_data->condition->notify_one(); + } + } + + z_drop(z_move(keystr)); +} + +void client_data_handler(z_owned_reply_t * reply, void * client_data) +{ + auto rmw_client_data = static_cast(client_data); + if (rmw_client_data == nullptr) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain rmw_client_data_t " + ); + return; + } + RCUTILS_LOG_INFO_NAMED( + "rmw_zenoh_cpp", + "[client_data_handler] triggered for %s", + rmw_client_data->service_name + ); + if (!z_reply_check(reply)) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "z_check returned False" + ); + return; + } + if (!z_reply_is_ok(reply)) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "z_reply_is_ok returned False" + ); + return; + } + + z_sample_t sample = z_reply_ok(reply); + + z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); + + RCUTILS_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "[client_data_handler] keyexpr of sample: %s", + z_loan(keystr) + ); + + { + std::lock_guard msg_lock(rmw_client_data->message_mutex); + rmw_client_data->message = std::make_unique( + zc_sample_payload_rcinc(&sample), sample.timestamp.time, sample.timestamp.id.id); + } + { + std::lock_guard internal_lock(rmw_client_data->internal_mutex); + if (rmw_client_data->condition != nullptr) { + rmw_client_data->condition->notify_one(); + } + } + + z_reply_drop(reply); + z_drop(z_move(keystr)); +} diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index d16e57ce..0a44556e 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -31,6 +32,7 @@ #include "graph_cache.hpp" #include "message_type_support.hpp" +#include "service_type_support.hpp" /// Structs for various type erased data fields. @@ -131,4 +133,83 @@ struct rmw_subscription_data_t std::condition_variable * condition{nullptr}; }; + +///============================================================================== + +// z_owned_closure_query_t +void service_data_handler(const z_query_t * query, void * service_data); + +void client_data_handler(z_owned_reply_t * reply, void * client_data); + + +struct saved_queryable_data +{ + explicit saved_queryable_data(z_owned_query_t query) + : query(query) + { + } + + const z_owned_query_t query; +}; + +///============================================================================== + +struct rmw_service_data_t +{ + unsigned int get_new_uid() + { + return client_count++; + } + + const char * zn_queryable_key; + z_owned_queryable_t zn_queryable; + + const void * request_type_support_impl; + const void * response_type_support_impl; + const char * typesupport_identifier; + RequestTypeSupport * request_type_support; + ResponseTypeSupport * response_type_support; + + rmw_context_t * context; + + // Map to store the query id and the query. + // The query handler is saved as it is needed to answer the query later on. + std::unordered_map> id_query_map; + // The query id's of the queries that need to be processed. + std::deque to_take; + std::mutex query_queue_mutex; + + std::mutex internal_mutex; + std::condition_variable * condition{nullptr}; + + unsigned int client_count{}; +}; + +///============================================================================== + +struct rmw_client_data_t +{ + const char * service_name; + + // TODO(francocipollone): Remove this. For some reason if I remove this(not being even used) it + // ends up panicking when calling the service. Something is missing. + z_owned_reply_channel_t zn_reply_channel; + z_owned_closure_reply_t zn_closure_reply; + + + std::mutex message_mutex; + std::unique_ptr message; + + const void * request_type_support_impl; + const void * response_type_support_impl; + const char * typesupport_identifier; + RequestTypeSupport * request_type_support; + ResponseTypeSupport * response_type_support; + + rmw_context_t * context; + + std::mutex internal_mutex; + std::condition_variable * condition{nullptr}; +}; + #endif // DETAIL__RMW_DATA_TYPES_HPP_ diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 4fa0ebbb..9ef46cf0 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -381,6 +381,33 @@ static const rosidl_message_type_support_t * find_type_support( return type_support; } +//============================================================================== +static const rosidl_service_type_support_t * find_service_type_support( + const rosidl_service_type_support_t * type_supports) +{ + const rosidl_service_type_support_t * type_support = get_service_typesupport_handle( + type_supports, RMW_ZENOH_CPP_TYPESUPPORT_C); + if (!type_support) { + rcutils_error_string_t prev_error_string = rcutils_get_error_string(); + rcutils_reset_error(); + type_support = get_service_typesupport_handle( + type_supports, RMW_ZENOH_CPP_TYPESUPPORT_CPP); + if (!type_support) { + rcutils_error_string_t error_string = rcutils_get_error_string(); + rcutils_reset_error(); + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "Type support not from this implementation. Got:\n" + " %s\n" + " %s\n" + "while fetching it", + prev_error_string.str, error_string.str); + return nullptr; + } + } + + return type_support; +} + //============================================================================== /// Create a publisher and return a handle to that publisher. rmw_publisher_t * @@ -1628,15 +1655,199 @@ rmw_return_loaned_message_from_subscription( rmw_client_t * rmw_create_client( const rmw_node_t * node, - const rosidl_service_type_support_t * type_support, + const rosidl_service_type_support_t * type_supports, const char * service_name, const rmw_qos_profile_t * qos_profile) { - static_cast(node); - static_cast(type_support); - static_cast(service_name); - static_cast(qos_profile); - return nullptr; + RCUTILS_LOG_INFO_NAMED( + "rmw_zenoh_common_cpp", + "[rmw_create_client] %s with queue of depth %ld", + service_name, + qos_profile->depth); + RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr); + + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + node, + node->implementation_identifier, + rmw_zenoh_identifier, + return nullptr); + + RMW_CHECK_ARGUMENT_FOR_NULL(service_name, nullptr); + if (strlen(service_name) == 0) { + RMW_SET_ERROR_MSG("service name is empty string"); + return nullptr; + } + RMW_CHECK_ARGUMENT_FOR_NULL(qos_profile, nullptr); + RMW_CHECK_ARGUMENT_FOR_NULL(type_supports, nullptr); + + RMW_CHECK_FOR_NULL_WITH_MSG( + node->context, + "expected initialized context", + return nullptr); + RMW_CHECK_FOR_NULL_WITH_MSG( + node->context->impl, + "expected initialized context impl", + return nullptr); + rmw_context_impl_s * context_impl = static_cast( + node->context->impl); + RMW_CHECK_FOR_NULL_WITH_MSG( + context_impl, + "unable to get rmw_context_impl_s", + return nullptr); + if (!z_check(context_impl->session)) { + RMW_SET_ERROR_MSG("zenoh session is invalid"); + return nullptr; + } + + rcutils_allocator_t * allocator = &node->context->options.allocator; + + // Validate service name + int validation_result; + + if (rmw_validate_full_topic_name(service_name, &validation_result, nullptr) != RMW_RET_OK) { + RMW_SET_ERROR_MSG("rmw_validate_full_topic_name failed"); + return nullptr; + } + + if (validation_result != RMW_TOPIC_VALID && !qos_profile->avoid_ros_namespace_conventions) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("service name is malformed: %s", service_name); + return nullptr; + } + + // client data + rmw_client_t * rmw_client = static_cast(allocator->allocate( + sizeof(rmw_client_t), + allocator->state)); + RMW_CHECK_FOR_NULL_WITH_MSG( + rmw_client, + "failed to allocate memory for the client", + return nullptr); + + auto free_rmw_client = rcpputils::make_scope_exit( + [rmw_client, allocator]() { + allocator->deallocate(rmw_client, allocator->state); + }); + + auto client_data = static_cast( + allocator->allocate(sizeof(rmw_client_data_t), allocator->state)); + RMW_CHECK_FOR_NULL_WITH_MSG( + client_data, + "failed to allocate memory for client data", + return nullptr); + auto free_client_data = rcpputils::make_scope_exit( + [client_data, allocator]() { + allocator->deallocate(client_data, allocator->state); + }); + + + // Obtain the type support + const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); + if (type_support == nullptr) { + // error was already set by find_type_support + return nullptr; + } + + auto service_members = static_cast(type_support->data); + auto request_members = static_cast( + service_members->request_members_->data); + auto response_members = static_cast( + service_members->response_members_->data); + + client_data->context = node->context; + client_data->typesupport_identifier = type_support->typesupport_identifier; + client_data->request_type_support_impl = request_members; + client_data->response_type_support_impl = response_members; + + // Request type support + client_data->request_type_support = static_cast( + allocator->allocate(sizeof(RequestTypeSupport), allocator->state)); + + RMW_CHECK_FOR_NULL_WITH_MSG( + client_data->request_type_support, + "Failed to allocate RequestTypeSupport", + return nullptr); + auto free_request_type_support = rcpputils::make_scope_exit( + [client_data, allocator]() { + allocator->deallocate(client_data->request_type_support, allocator->state); + }); + + RMW_TRY_PLACEMENT_NEW( + client_data->request_type_support, + client_data->request_type_support, + return nullptr, + RequestTypeSupport, service_members); + auto destruct_request_type_support = rcpputils::make_scope_exit( + [client_data]() { + RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( + client_data->request_type_support->~RequestTypeSupport(), + RequestTypeSupport); + }); + + // Response type support + client_data->response_type_support = static_cast( + allocator->allocate(sizeof(ResponseTypeSupport), allocator->state)); + + RMW_CHECK_FOR_NULL_WITH_MSG( + client_data->response_type_support, + "Failed to allocate ResponseTypeSupport", + return nullptr); + auto free_response_type_support = rcpputils::make_scope_exit( + [client_data, allocator]() { + allocator->deallocate(client_data->response_type_support, allocator->state); + }); + + RMW_TRY_PLACEMENT_NEW( + client_data->response_type_support, + client_data->response_type_support, + return nullptr, + ResponseTypeSupport, service_members); + auto destruct_response_type_support = rcpputils::make_scope_exit( + [client_data]() { + RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( + client_data->response_type_support->~ResponseTypeSupport(), + ResponseTypeSupport); + }); + + // Populate the rmw_client. + rmw_client->data = client_data; + rmw_client->implementation_identifier = rmw_zenoh_identifier; + + rmw_client->service_name = rcutils_strdup(service_name, *allocator); + + RMW_CHECK_FOR_NULL_WITH_MSG( + rmw_client->service_name, + "failed to allocate client name", + return nullptr); + + auto free_service_name = rcpputils::make_scope_exit( + [rmw_client, allocator]() { + allocator->deallocate(const_cast(rmw_client->service_name), allocator->state); + }); + + // Zenoh implementation for the client + + // TODO(francocipollone): Replace ros_topic_name_to_zenoh_key by service related function. + // If this is enough simply rename the method. + z_owned_keyexpr_t keyexpr = ros_topic_name_to_zenoh_key( + rmw_client->service_name, node->context->actual_domain_id, allocator); + auto always_free_ros_keyexpr = rcpputils::make_scope_exit( + [&keyexpr]() { + z_keyexpr_drop(z_move(keyexpr)); + }); + if (!z_keyexpr_check(&keyexpr)) { + RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); + return nullptr; + } + + free_rmw_client.cancel(); + free_client_data.cancel(); + free_request_type_support.cancel(); + destruct_request_type_support.cancel(); + free_response_type_support.cancel(); + destruct_response_type_support.cancel(); + free_service_name.cancel(); + + return rmw_client; } //============================================================================== @@ -1644,9 +1855,35 @@ rmw_create_client( rmw_ret_t rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) { - static_cast(node); - static_cast(client); - return RMW_RET_UNSUPPORTED; + RCUTILS_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "[rmw_destroy_client] %s", client->service_name); + + // ASSERTIONS ================================================================ + RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + node, + node->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + client, + client->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + + rcutils_allocator_t * allocator = &node->context->options.allocator; + + auto client_data = static_cast(client->data); + + // CLEANUP =================================================================== + allocator->deallocate(client_data->request_type_support, allocator->state); + allocator->deallocate(client_data->response_type_support, allocator->state); + allocator->deallocate(client->data, allocator->state); + + allocator->deallocate(const_cast(client->service_name), allocator->state); + allocator->deallocate(client, allocator->state); + + return RMW_RET_OK; } //============================================================================== @@ -1657,10 +1894,93 @@ rmw_send_request( const void * ros_request, int64_t * sequence_id) { - static_cast(client); - static_cast(ros_request); - static_cast(sequence_id); - return RMW_RET_UNSUPPORTED; + RCUTILS_LOG_INFO_NAMED( + "rmw_zenoh_cpp", "[rmw_send_request]"); + + RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(ros_request, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(sequence_id, RMW_RET_INVALID_ARGUMENT); + + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + client, + client->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + + RMW_CHECK_FOR_NULL_WITH_MSG( + client->data, + "client implementation pointer is null", + RMW_RET_INVALID_ARGUMENT); + + auto * client_data = static_cast(client->data); + + rmw_context_impl_s * context_impl = static_cast( + client_data->context->impl); + + // Serialize data + + rcutils_allocator_t * allocator = &(client_data->context->options.allocator); + + size_t max_data_length = ( + client_data->request_type_support->get_estimated_serialized_size( + ros_request, client_data->request_type_support_impl)); + + // Init serialized message byte array + char * request_bytes = static_cast(allocator->allocate( + max_data_length, + allocator->state)); + if (!request_bytes) { + RMW_SET_ERROR_MSG("failed allocate request message bytes"); + allocator->deallocate(request_bytes, allocator->state); + return RMW_RET_ERROR; + } + + // Object that manages the raw buffer + eprosima::fastcdr::FastBuffer fastbuffer(request_bytes, max_data_length); + + // Object that serializes the data + eprosima::fastcdr::Cdr ser( + fastbuffer, + eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, + eprosima::fastcdr::Cdr::DDS_CDR); + if (!client_data->request_type_support->serialize_ros_message( + ros_request, + ser, + client_data->request_type_support_impl)) + { + allocator->deallocate(request_bytes, allocator->state); + return RMW_RET_ERROR; + } + + size_t data_length = ser.getSerializedDataLength(); + + // TODO(francocipollone): Do I really need the sequency number here? + *sequence_id = 0; + + + // Send request + z_get_options_t opts = z_get_options_default(); + opts.value.payload = z_bytes_t{data_length, reinterpret_cast(request_bytes)}; + opts.value.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL); + + + z_owned_keyexpr_t keyexpr = ros_topic_name_to_zenoh_key( + client->service_name, client_data->context->actual_domain_id, allocator); + auto always_free_ros_keyexpr = rcpputils::make_scope_exit( + [&keyexpr]() { + z_keyexpr_drop(z_move(keyexpr)); + }); + if (!z_keyexpr_check(&keyexpr)) { + RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); + return RMW_RET_ERROR; + } + + client_data->service_name = client->service_name; + client_data->zn_closure_reply = z_closure(client_data_handler, nullptr, client_data); + + z_get(z_loan(context_impl->session), z_loan(keyexpr), "", &client_data->zn_closure_reply, &opts); + + return RMW_RET_OK; } //============================================================================== @@ -1672,11 +1992,67 @@ rmw_take_response( void * ros_response, bool * taken) { - static_cast(client); - static_cast(request_header); - static_cast(ros_response); - static_cast(taken); - return RMW_RET_UNSUPPORTED; + *taken = false; + RCUTILS_LOG_INFO_NAMED("rmw_zenoh_cpp", "[rmw_take_response]"); + + RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(ros_response, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); + + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + client, + client->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + + RMW_CHECK_FOR_NULL_WITH_MSG( + client->service_name, "client has no service name", RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_FOR_NULL_WITH_MSG( + client->data, "client implementation pointer is null", RMW_RET_INVALID_ARGUMENT); + + + auto client_data = static_cast(client->data); + + std::unique_ptr msg_data = nullptr; + + { + std::lock_guard lock(client_data->message_mutex); + if (client_data->message == nullptr) { + // TODO(francocipollone): Verify behavior. + RCUTILS_LOG_ERROR_NAMED("rmw_zenoh_cpp", "[rmw_take_response] Response message is empty"); + return RMW_RET_ERROR; + } + + msg_data = std::move(client_data->message); + client_data->message.release(); + } + + // Object that manages the raw buffer + eprosima::fastcdr::FastBuffer fastbuffer( + reinterpret_cast(const_cast(msg_data->payload.payload.start)), + msg_data->payload.payload.len); + + // Object that serializes the data + eprosima::fastcdr::Cdr deser( + fastbuffer, + eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, + eprosima::fastcdr::Cdr::DDS_CDR); + if (!client_data->response_type_support->deserialize_ros_message( + deser, + ros_response, + client_data->response_type_support_impl)) + { + RMW_SET_ERROR_MSG("could not deserialize ROS response"); + return RMW_RET_ERROR; + } + + *taken = true; + zc_payload_drop(&(msg_data->payload)); + + // TODO(francocipollone): Verify request_header information. + request_header->request_id.sequence_number = 0; + + return RMW_RET_OK; } //============================================================================== @@ -1686,9 +2062,10 @@ rmw_client_request_publisher_get_actual_qos( const rmw_client_t * client, rmw_qos_profile_t * qos) { + // TODO(francocipollone): Fix. static_cast(client); static_cast(qos); - return RMW_RET_UNSUPPORTED; + return RMW_RET_OK; } //============================================================================== @@ -1698,9 +2075,10 @@ rmw_client_response_subscription_get_actual_qos( const rmw_client_t * client, rmw_qos_profile_t * qos) { + // TODO(francocipollone): Fix. static_cast(client); static_cast(qos); - return RMW_RET_UNSUPPORTED; + return RMW_RET_OK; } //============================================================================== @@ -1712,8 +2090,12 @@ rmw_create_service( const char * service_name, const rmw_qos_profile_t * qos_profiles) { - // Interim implementation to suppress type_description service that spins up - // with a node by default. + RCUTILS_LOG_INFO_NAMED( + "rmw_zenoh_cpp", + "[rmw_create_service] %s", + service_name); + + // ASSERTIONS ================================================================ RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, @@ -1729,6 +2111,7 @@ rmw_create_service( RMW_CHECK_ARGUMENT_FOR_NULL(qos_profiles, nullptr); if (!qos_profiles->avoid_ros_namespace_conventions) { int validation_result = RMW_TOPIC_VALID; + // TODO(francocipollone): Verify if this is the right way to validate the service name. rmw_ret_t ret = rmw_validate_full_topic_name(service_name, &validation_result, nullptr); if (RMW_RET_OK != ret) { return nullptr; @@ -1739,20 +2122,199 @@ rmw_create_service( return nullptr; } } - // rmw_qos_profile_t adapted_qos_profile = - // rmw_dds_common::qos_profile_update_best_available_for_services(*qos_profile); - // if (!is_valid_qos(adapted_qos_profile)) { - // RMW_SET_ERROR_MSG("create_service() called with invalid QoS"); - // return nullptr; - // } - rmw_service_t * service = rmw_service_allocate(); - if (!service) { - RMW_SET_ERROR_MSG("failed to allocate rmw_service_t"); + RMW_CHECK_FOR_NULL_WITH_MSG( + node->context, + "expected initialized context", + return nullptr); + RMW_CHECK_FOR_NULL_WITH_MSG( + node->context->impl, + "expected initialized context impl", + return nullptr); + rmw_context_impl_s * context_impl = static_cast( + node->context->impl); + RMW_CHECK_FOR_NULL_WITH_MSG( + context_impl, + "unable to get rmw_context_impl_s", + return nullptr); + if (!z_check(context_impl->session)) { + RMW_SET_ERROR_MSG("zenoh session is invalid"); + return nullptr; + } + + // SERVICE DATA ============================================================== + rcutils_allocator_t * allocator = &node->context->options.allocator; + + rmw_service_t * rmw_service = static_cast(allocator->allocate( + sizeof(rmw_service_t), + allocator->state)); + RMW_CHECK_FOR_NULL_WITH_MSG( + rmw_service, + "failed to allocate memory for the service", + return nullptr); + + auto free_rmw_service = rcpputils::make_scope_exit( + [rmw_service, allocator]() { + allocator->deallocate(rmw_service, allocator->state); + }); + + auto service_data = static_cast( + allocator->allocate(sizeof(rmw_service_data_t), allocator->state)); + RMW_CHECK_FOR_NULL_WITH_MSG( + service_data, + "failed to allocate memory for service data", + return nullptr); + auto free_service_data = rcpputils::make_scope_exit( + [service_data, allocator]() { + allocator->deallocate(service_data, allocator->state); + }); + + RMW_TRY_PLACEMENT_NEW(service_data, service_data, return nullptr, rmw_service_data_t); + auto destruct_service_data = rcpputils::make_scope_exit( + [service_data]() { + RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( + service_data->~rmw_service_data_t(), + rmw_service_data_t); + }); + + // Get the RMW type support. + const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); + if (type_support == nullptr) { + // error was already set by find_type_support + return nullptr; + } + + // TODO(francocipollone): Verify if this is the right way to get the + // type support as the architecture for the service here + // is different to the one used in DDS (with fastcdr). + auto service_members = static_cast(type_support->data); + auto request_members = static_cast( + service_members->request_members_->data); + auto response_members = static_cast( + service_members->response_members_->data); + + service_data->context = node->context; + service_data->typesupport_identifier = type_support->typesupport_identifier; + service_data->request_type_support_impl = request_members; + service_data->response_type_support_impl = response_members; + + // Request type support + service_data->request_type_support = static_cast( + allocator->allocate(sizeof(RequestTypeSupport), allocator->state)); + + RMW_CHECK_FOR_NULL_WITH_MSG( + service_data->request_type_support, + "Failed to allocate RequestTypeSupport", + return nullptr); + auto free_request_type_support = rcpputils::make_scope_exit( + [service_data, allocator]() { + allocator->deallocate(service_data->request_type_support, allocator->state); + }); + + RMW_TRY_PLACEMENT_NEW( + service_data->request_type_support, + service_data->request_type_support, + return nullptr, + RequestTypeSupport, service_members); + auto destruct_request_type_support = rcpputils::make_scope_exit( + [service_data]() { + RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( + service_data->request_type_support->~RequestTypeSupport(), + RequestTypeSupport); + }); + + // Response type support + service_data->response_type_support = static_cast( + allocator->allocate(sizeof(ResponseTypeSupport), allocator->state)); + + RMW_CHECK_FOR_NULL_WITH_MSG( + service_data->response_type_support, + "Failed to allocate ResponseTypeSupport", + return nullptr); + auto free_response_type_support = rcpputils::make_scope_exit( + [service_data, allocator]() { + allocator->deallocate(service_data->response_type_support, allocator->state); + }); + + RMW_TRY_PLACEMENT_NEW( + service_data->response_type_support, + service_data->response_type_support, + return nullptr, + ResponseTypeSupport, service_members); + auto destruct_response_type_support = rcpputils::make_scope_exit( + [service_data]() { + RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( + service_data->response_type_support->~ResponseTypeSupport(), + ResponseTypeSupport); + }); + + // Populate the rmw_service. + rmw_service->data = service_data; + rmw_service->implementation_identifier = rmw_zenoh_identifier; + + rmw_service->service_name = rcutils_strdup(service_name, *allocator); + + RMW_CHECK_FOR_NULL_WITH_MSG( + rmw_service->service_name, + "failed to allocate service name", + return nullptr); + + auto free_service_name = rcpputils::make_scope_exit( + [rmw_service, allocator]() { + allocator->deallocate(const_cast(rmw_service->service_name), allocator->state); + }); + + + // Zenoh implementation for the service + + // TODO(francocipollone): Replace ros_topic_name_to_zenoh_key by service related function. + // If this is enough simply rename the method. + z_owned_keyexpr_t keyexpr = ros_topic_name_to_zenoh_key( + rmw_service->service_name, node->context->actual_domain_id, allocator); + auto always_free_ros_keyexpr = rcpputils::make_scope_exit( + [&keyexpr]() { + z_keyexpr_drop(z_move(keyexpr)); + }); + if (!z_keyexpr_check(&keyexpr)) { + RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); + return nullptr; + } + + RCUTILS_LOG_INFO_NAMED( + "rmw_zenoh_cpp", "[rmw_create_service] keyexpr: %s", + z_loan(z_keyexpr_to_string(z_loan(keyexpr)))); + + z_owned_closure_query_t callback = z_closure(service_data_handler, nullptr, service_data); + + service_data->zn_queryable = z_declare_queryable( + z_loan(context_impl->session), + z_loan(keyexpr), + z_move(callback), + nullptr); + + if (!z_check(service_data->zn_queryable)) { + RMW_SET_ERROR_MSG("unable to create zenoh queryable"); return nullptr; } - return service; + auto undeclare_z_queryable = rcpputils::make_scope_exit( + [service_data]() { + z_undeclare_queryable(z_move(service_data->zn_queryable)); + }); + + + // TODO(francocipollone): Update graph cache. + + free_rmw_service.cancel(); + free_service_data.cancel(); + free_service_name.cancel(); + destruct_service_data.cancel(); + destruct_request_type_support.cancel(); + destruct_response_type_support.cancel(); + free_request_type_support.cancel(); + free_response_type_support.cancel(); + undeclare_z_queryable.cancel(); + return rmw_service; } //============================================================================== @@ -1766,6 +2328,8 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) return RMW_RET_ERROR; } rmw_service_free(service); + + // TODO(francocipollone): Update graph cache. return RMW_RET_OK; } @@ -1778,11 +2342,84 @@ rmw_take_request( void * ros_request, bool * taken) { - static_cast(service); - static_cast(request_header); - static_cast(ros_request); - static_cast(taken); - return RMW_RET_UNSUPPORTED; + *taken = false; + RCUTILS_LOG_INFO_NAMED("rmw_zenoh_cpp", "[rmw_take_request]"); + + RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(ros_request, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); + + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + service, + service->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + + RMW_CHECK_FOR_NULL_WITH_MSG( + service->service_name, "service has no service name", RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_FOR_NULL_WITH_MSG( + service->data, "service implementation pointer is null", RMW_RET_INVALID_ARGUMENT); + + + auto * service_data = static_cast(service->data); + + std::unique_lock lock(service_data->query_queue_mutex); + + if (service_data->id_query_map.empty()) { + // TODO(francocipollone): Verify behavior. + RCUTILS_LOG_INFO_NAMED("rmw_zenoh_cpp", "[rmw_take_request] Take id_query_map is empty"); + return RMW_RET_OK; + } + + const unsigned int query_id = service_data->to_take.back(); + const auto query_ret = service_data->id_query_map.find(query_id); + if (query_ret == service_data->id_query_map.end()) { + RMW_SET_ERROR_MSG("Query id not found in id_query_map"); + return RMW_RET_ERROR; + } + const saved_queryable_data * query = (*query_ret).second.get(); + const z_owned_query_t * owned_query_ptr = &query->query; + // TODO(francocipollone): Remove the query id from the to_take collection + service_data->to_take.pop_back(); + service_data->query_queue_mutex.unlock(); + + // DESERIALIZE MESSAGE ======================================================== + if (!z_query_check(owned_query_ptr)) { + RMW_SET_ERROR_MSG("onwed_query_t contains gravestone, can't deserialize message"); + return RMW_RET_ERROR; + } + const z_query_t z_loaned_query = z_query_loan(owned_query_ptr); + z_value_t payload_value = z_query_value(&z_loaned_query); + + // Object that manages the raw buffer + eprosima::fastcdr::FastBuffer fastbuffer( + reinterpret_cast(const_cast(payload_value.payload.start)), + payload_value.payload.len); + + // Object that serializes the data + eprosima::fastcdr::Cdr deser( + fastbuffer, + eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, + eprosima::fastcdr::Cdr::DDS_CDR); + if (!service_data->request_type_support->deserialize_ros_message( + deser, + ros_request, + service_data->request_type_support_impl)) + { + RMW_SET_ERROR_MSG("could not deserialize ROS message"); + return RMW_RET_ERROR; + } + + RCUTILS_LOG_INFO_NAMED("rmw_zenoh_cpp", "[rmw_take_request] deserialized message"); + + + // Fill in the request header. + request_header->request_id.sequence_number = query_id; + + + *taken = true; + + return RMW_RET_OK; } //============================================================================== @@ -1793,10 +2430,82 @@ rmw_send_response( rmw_request_id_t * request_header, void * ros_response) { - static_cast(service); - static_cast(request_header); - static_cast(ros_response); - return RMW_RET_UNSUPPORTED; + RCUTILS_LOG_INFO_NAMED( + "rmw_zenoh_cpp", "[rmw_send_response]"); + + RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(request_header, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(ros_response, RMW_RET_INVALID_ARGUMENT); + + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + service, + service->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + + RMW_CHECK_FOR_NULL_WITH_MSG( + service->data, + "service implementation pointer is null", + RMW_RET_INVALID_ARGUMENT); + + auto * service_data = static_cast(service->data); + + rcutils_allocator_t * allocator = &(service_data->context->options.allocator); + + size_t max_data_length = ( + service_data->response_type_support->get_estimated_serialized_size( + ros_response, service_data->response_type_support_impl)); + + // Init serialized message byte array + char * response_bytes = static_cast(allocator->allocate( + max_data_length, + allocator->state)); + if (!response_bytes) { + RMW_SET_ERROR_MSG("failed allocate response message bytes"); + allocator->deallocate(response_bytes, allocator->state); + return RMW_RET_ERROR; + } + + // Object that manages the raw buffer + eprosima::fastcdr::FastBuffer fastbuffer(response_bytes, max_data_length); + + // Object that serializes the data + eprosima::fastcdr::Cdr ser( + fastbuffer, + eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, + eprosima::fastcdr::Cdr::DDS_CDR); + if (!service_data->response_type_support->serialize_ros_message( + ros_response, + ser, + service_data->response_type_support_impl)) + { + allocator->deallocate(response_bytes, allocator->state); + return RMW_RET_ERROR; + } + + size_t data_length = ser.getSerializedDataLength(); + + size_t meta_length = sizeof(request_header->sequence_number); + memcpy( + &response_bytes[data_length], + reinterpret_cast(&request_header->sequence_number), + meta_length); + + // Create the queryable payload + service_data->query_queue_mutex.lock(); + auto query = service_data->id_query_map[request_header->sequence_number] + ->query; + service_data->query_queue_mutex.unlock(); + + z_query_reply_options_t options = z_query_reply_options_default(); + options.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL); + const z_query_t loaned_query = z_loan(query); + const z_query_t * query_ptr = &loaned_query; + z_query_reply( + query_ptr, z_query_keyexpr(query_ptr), reinterpret_cast( + response_bytes), data_length + meta_length, &options); + + return RMW_RET_OK; } //============================================================================== @@ -2054,6 +2763,29 @@ rmw_wait( } } + + if (services) { + // Go through each of the services and attach the wait set condition variable to them. + // That way they can wake it up if they are triggered while we are waiting. + for (size_t i = 0; i < services->service_count; ++i) { + auto serv_data = static_cast(services->services[i]); + if (serv_data != nullptr) { + serv_data->condition = &wait_set_data->condition_variable; + } + } + } + + if (clients) { + // Go through each of the clients and attach the wait set condition variable to them. + // That way they can wake it up if they are triggered while we are waiting. + for (size_t i = 0; i < clients->client_count; ++i) { + auto client_data = static_cast(clients->clients[i]); + if (client_data != nullptr) { + client_data->condition = &wait_set_data->condition_variable; + } + } + } + std::unique_lock lock(wait_set_data->condition_mutex); // According to the RMW documentation, if wait_timeout is NULL that means @@ -2099,6 +2831,37 @@ rmw_wait( } } + if (services) { + // Now detach the condition variable and mutex from each of the services + for (size_t i = 0; i < services->service_count; ++i) { + auto serv_data = static_cast(services->services[i]); + if (serv_data != nullptr) { + serv_data->condition = nullptr; + if (serv_data->to_take.empty()) { + // Setting to nullptr lets rcl know that this service is not ready + services->services[i] = nullptr; + } + } + } + } + + if (clients) { + // Now detach the condition variable and mutex from each of the clients + for (size_t i = 0; i < clients->client_count; ++i) { + auto client_data = static_cast(clients->clients[i]); + if (client_data != nullptr) { + client_data->condition = nullptr; + // According to the documentation for rmw_wait in rmw.h, entries in the + // array that have *not* been triggered should be set to NULL + if (client_data->message == nullptr) { + // Setting to nullptr lets rcl know that this client is not ready + clients->clients[i] = nullptr; + } + } + } + } + + return RMW_RET_OK; } @@ -2274,10 +3037,12 @@ rmw_service_server_is_available( const rmw_client_t * client, bool * is_available) { + // TODO(francocipollone): Provide a proper implementation. + // We need graph cache information for this. + *is_available = true; static_cast(node); static_cast(client); - static_cast(is_available); - return RMW_RET_UNSUPPORTED; + return RMW_RET_OK; } //==============================================================================