Skip to content

Commit

Permalink
Implement custom functions to adapt qos (#234)
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund authored Jul 3, 2024
1 parent 9ede38d commit 747a72e
Show file tree
Hide file tree
Showing 9 changed files with 223 additions and 56 deletions.
3 changes: 1 addition & 2 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ find_package(rcutils REQUIRED)
find_package(rosidl_typesupport_fastrtps_c REQUIRED)
find_package(rosidl_typesupport_fastrtps_cpp REQUIRED)
find_package(rmw REQUIRED)
find_package(rmw_dds_common REQUIRED)
find_package(zenoh_c_vendor REQUIRED)
find_package(zenohc_debug QUIET)
if(NOT zenohc_debug_FOUND)
Expand All @@ -38,6 +37,7 @@ add_library(rmw_zenoh_cpp SHARED
src/detail/liveliness_utils.cpp
src/detail/logging.cpp
src/detail/message_type_support.cpp
src/detail/qos.cpp
src/detail/rmw_data_types.cpp
src/detail/service_type_support.cpp
src/detail/type_support.cpp
Expand Down Expand Up @@ -65,7 +65,6 @@ target_link_libraries(rmw_zenoh_cpp
rosidl_typesupport_fastrtps_c::rosidl_typesupport_fastrtps_c
rosidl_typesupport_fastrtps_cpp::rosidl_typesupport_fastrtps_cpp
rmw::rmw
rmw_dds_common::rmw_dds_common_library
zenohc::lib
)

Expand Down
1 change: 0 additions & 1 deletion rmw_zenoh_cpp/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
<depend>rosidl_typesupport_fastrtps_c</depend>
<depend>rosidl_typesupport_fastrtps_cpp</depend>
<depend>rmw</depend>
<depend>rmw_dds_common</depend>

<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
Expand Down
2 changes: 0 additions & 2 deletions rmw_zenoh_cpp/src/detail/logging.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

#include "logging.hpp"

#include <sstream>

namespace rmw_zenoh_cpp
{
///=============================================================================
Expand Down
5 changes: 0 additions & 5 deletions rmw_zenoh_cpp/src/detail/logging.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@

#include <rcutils/logging.h>

#include <iostream>
#include <mutex>
#include <string>
#include <sstream>

namespace rmw_zenoh_cpp
{
///=============================================================================
Expand Down
134 changes: 134 additions & 0 deletions rmw_zenoh_cpp/src/detail/qos.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright 2024 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "qos.hpp"

// Define defaults for various QoS settings.
#define RMW_ZENOH_DEFAULT_HISTORY RMW_QOS_POLICY_HISTORY_KEEP_LAST;
// If the depth field in the qos profile is set to 0, the RMW implementation
// has the liberty to assign a default depth. The zenoh transport protocol
// is configured with 256 channels so theoretically, this would be the maximum
// depth we can set before blocking transport. A high depth would increase the
// memory footprint of processes as more messages are stored in memory while a
// very low depth might unintentionally drop messages leading to a poor
// out-of-the-box experience for new users. For now we set the depth to 42,
// a popular "magic number". See https://en.wikipedia.org/wiki/42_(number).
#define RMW_ZENOH_DEFAULT_HISTORY_DEPTH 42;
#define RMW_ZENOH_DEFAULT_RELIABILITY RMW_QOS_POLICY_RELIABILITY_RELIABLE;
#define RMW_ZENOH_DEFAULT_DURABILITY RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL;
#define RMW_ZENOH_DEFAULT_DEADLINE RMW_DURATION_INFINITE;
#define RMW_ZENOH_DEFAULT_LIFESPAN RMW_DURATION_INFINITE;
#define RMW_ZENOH_DEFAULT_LIVELINESS RMW_QOS_POLICY_LIVELINESS_AUTOMATIC;
#define RMW_ZENOH_DEFAULT_LIVELINESS_LEASE_DURATION RMW_DURATION_INFINITE;

namespace rmw_zenoh_cpp
{
///=============================================================================
QoS::QoS()
{
default_qos_.history = RMW_ZENOH_DEFAULT_HISTORY;
default_qos_.depth = RMW_ZENOH_DEFAULT_HISTORY_DEPTH;
default_qos_.reliability = RMW_ZENOH_DEFAULT_RELIABILITY;
default_qos_.durability = RMW_ZENOH_DEFAULT_DURABILITY;
default_qos_.deadline = RMW_ZENOH_DEFAULT_DEADLINE;
default_qos_.lifespan = RMW_ZENOH_DEFAULT_LIFESPAN;
default_qos_.liveliness = RMW_ZENOH_DEFAULT_LIVELINESS;
default_qos_.liveliness_lease_duration = RMW_ZENOH_DEFAULT_LIVELINESS_LEASE_DURATION;
}

///=============================================================================
QoS & QoS::get()
{
static QoS qos;
return qos;
}

///=============================================================================
const rmw_qos_profile_t & QoS::default_qos() const
{
return default_qos_;
}

///=============================================================================
rmw_ret_t QoS::best_available_qos(
const rmw_node_t * node,
const char * topic_name,
rmw_qos_profile_t * qos_profile,
const GetEndpointInfoByTopicFunction & get_endpoint_info_for_other) const
{
// We could rely on the GetEndpointInfoByTopicFunction callback to get
// endpoint information of downstream consumers to match certain QoS settings.
// Practically since Zenoh transport will succeed for any combination of
// settings, this could only be useful to avoid creating transient_local
// subscriptions when there are no transient_local publishers in the graph.
// This will avoid some overhead with having QueryingSubscriber. For now,
// we skip this optimization.
static_cast<void>(node);
static_cast<void>(topic_name);
static_cast<void>(get_endpoint_info_for_other);

switch (qos_profile->history) {
case RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT:
case RMW_QOS_POLICY_RELIABILITY_UNKNOWN:
qos_profile->history = default_qos_.history;
default:
break;
}

if (qos_profile->depth == 0) {
qos_profile->depth = default_qos_.depth;
}

switch (qos_profile->reliability) {
case RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT:
case RMW_QOS_POLICY_RELIABILITY_UNKNOWN:
qos_profile->reliability = default_qos_.reliability;
default:
break;
}

switch (qos_profile->durability) {
case RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT:
case RMW_QOS_POLICY_DURABILITY_UNKNOWN:
qos_profile->durability = default_qos_.durability;
default:
break;
}

if (rmw_time_equal(qos_profile->deadline, RMW_QOS_DEADLINE_DEFAULT)) {
qos_profile->deadline = default_qos_.deadline;
}

if (rmw_time_equal(qos_profile->lifespan, RMW_QOS_LIFESPAN_DEFAULT)) {
qos_profile->lifespan = default_qos_.lifespan;
}

if (rmw_time_equal(
qos_profile->liveliness_lease_duration,
RMW_QOS_LIVELINESS_LEASE_DURATION_DEFAULT))
{
qos_profile->liveliness_lease_duration = default_qos_.liveliness_lease_duration;
}

switch (qos_profile->liveliness) {
case RMW_QOS_POLICY_LIVELINESS_SYSTEM_DEFAULT:
case RMW_QOS_POLICY_LIVELINESS_UNKNOWN:
qos_profile->liveliness = default_qos_.liveliness;
default:
break;
}

return RMW_RET_OK;
}
} // namespace rmw_zenoh_cpp
56 changes: 56 additions & 0 deletions rmw_zenoh_cpp/src/detail/qos.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2024 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef DETAIL__QOS_HPP_
#define DETAIL__QOS_HPP_

#include <rcutils/logging.h>
#include <rmw/topic_endpoint_info_array.h>
#include <rmw/types.h>

#include <functional>

namespace rmw_zenoh_cpp
{
//==============================================================================
/// Signature matching rmw_get_publishers_info_by_topic and rmw_get_subscriptions_info_by_topic
using GetEndpointInfoByTopicFunction = std::function<rmw_ret_t(
const rmw_node_t *,
rcutils_allocator_t *,
const char *,
bool,
rmw_topic_endpoint_info_array_t *)>;

//==============================================================================
class QoS
{
public:
static QoS & get();

const rmw_qos_profile_t & default_qos() const;

rmw_ret_t best_available_qos(
const rmw_node_t * node,
const char * topic_name,
rmw_qos_profile_t * qos_profile,
const GetEndpointInfoByTopicFunction & get_endpoint_info_for_other) const;

private:
// Private constructor which initializes the default qos.
QoS();
rmw_qos_profile_t default_qos_;
};
} // namespace rmw_zenoh_cpp

#endif // DETAIL__QOS_HPP_
12 changes: 9 additions & 3 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ void rmw_subscription_data_t::add_new_message(
{
std::lock_guard<std::mutex> lock(message_queue_mutex_);

if (message_queue_.size() >= adapted_qos_profile.depth) {
if (adapted_qos_profile.history != RMW_QOS_POLICY_HISTORY_KEEP_ALL &&
message_queue_.size() >= adapted_qos_profile.depth)
{
// Log warning if message is discarded due to hitting the queue depth
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
Expand Down Expand Up @@ -238,7 +240,9 @@ void rmw_service_data_t::notify()
void rmw_service_data_t::add_new_query(std::unique_ptr<ZenohQuery> query)
{
std::lock_guard<std::mutex> lock(query_queue_mutex_);
if (query_queue_.size() >= adapted_qos_profile.depth) {
if (adapted_qos_profile.history != RMW_QOS_POLICY_HISTORY_KEEP_ALL &&
query_queue_.size() >= adapted_qos_profile.depth)
{
// Log warning if message is discarded due to hitting the queue depth
z_owned_str_t keystr = z_keyexpr_to_string(z_loan(this->keyexpr));
RMW_ZENOH_LOG_ERROR_NAMED(
Expand Down Expand Up @@ -331,7 +335,9 @@ void rmw_client_data_t::notify()
void rmw_client_data_t::add_new_reply(std::unique_ptr<ZenohReply> reply)
{
std::lock_guard<std::mutex> lock(reply_queue_mutex_);
if (reply_queue_.size() >= adapted_qos_profile.depth) {
if (adapted_qos_profile.history != RMW_QOS_POLICY_HISTORY_KEEP_ALL &&
reply_queue_.size() >= adapted_qos_profile.depth)
{
// Log warning if message is discarded due to hitting the queue depth
z_owned_str_t keystr = z_keyexpr_to_string(z_loan(this->keyexpr));
RMW_ZENOH_LOG_ERROR_NAMED(
Expand Down
2 changes: 0 additions & 2 deletions rmw_zenoh_cpp/src/rmw_qos.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
#include "rmw/types.h"
#include "rmw/qos_profiles.h"

#include "rmw_dds_common/qos.hpp"

extern "C"
{
namespace
Expand Down
Loading

0 comments on commit 747a72e

Please sign in to comment.