Skip to content

Commit

Permalink
Create PublisherData within NodeData
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Sep 27, 2024
1 parent 0053d8d commit d19c278
Show file tree
Hide file tree
Showing 14 changed files with 881 additions and 433 deletions.
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ add_library(rmw_zenoh_cpp SHARED
src/detail/qos.cpp
src/detail/rmw_context_impl_s.cpp
src/detail/rmw_data_types.cpp
src/detail/rmw_publisher_data.cpp
src/detail/rmw_node_data.cpp
src/detail/service_type_support.cpp
src/detail/type_support.cpp
Expand Down
13 changes: 6 additions & 7 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -852,25 +852,24 @@ rmw_ret_t GraphCache::get_topic_names_and_types(

///=============================================================================
rmw_ret_t GraphCache::publisher_count_matched_subscriptions(
const rmw_publisher_t * publisher,
PublisherDataConstPtr pub_data,
size_t * subscription_count)
{
// TODO(Yadunund): Replace this logic by returning a number that is tracked once
// we support matched qos events.
*subscription_count = 0;
GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(publisher->topic_name);
auto topic_info = pub_data->entity()->topic_info().value();
GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(topic_info.name_);
if (topic_it != graph_topics_.end()) {
rmw_publisher_data_t * pub_data =
static_cast<rmw_publisher_data_t *>(publisher->data);
GraphNode::TopicTypeMap::const_iterator topic_data_it = topic_it->second.find(
pub_data->type_support->get_name());
topic_info.type_);
if (topic_data_it != topic_it->second.end()) {
for (const auto & [_, topic_data] : topic_data_it->second) {
// If a subscription exists with compatible QoS, update the subscription count.
if (!topic_data->subs_.empty()) {
rmw_qos_compatibility_type_t is_compatible;
rmw_ret_t ret = rmw_qos_profile_check_compatible(
pub_data->adapted_qos_profile,
pub_data->adapted_qos_profile(),
topic_data->info_.qos_,
&is_compatible,
nullptr,
Expand Down Expand Up @@ -1348,7 +1347,7 @@ void GraphCache::set_querying_subscriber_callback(
const rmw_subscription_data_t * sub_data,
QueryingSubscriberCallback cb)
{
const std::string keyexpr = sub_data->entity->topic_info()->topic_keyexpr_;
const std::string keyexpr = sub_data->entity->topic_info().value().topic_keyexpr_;
auto cb_it = querying_subs_cbs_.find(keyexpr);
if (cb_it == querying_subs_cbs_.end()) {
querying_subs_cbs_[keyexpr] = std::move(
Expand Down
3 changes: 2 additions & 1 deletion rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "event.hpp"
#include "liveliness_utils.hpp"
#include "ordered_map.hpp"
#include "rmw_publisher_data.hpp"

#include "rcutils/allocator.h"
#include "rcutils/types.h"
Expand Down Expand Up @@ -133,7 +134,7 @@ class GraphCache final
rmw_names_and_types_t * topic_names_and_types) const;

rmw_ret_t publisher_count_matched_subscriptions(
const rmw_publisher_t * publisher,
PublisherDataConstPtr pub_data,
size_t * subscription_count);

rmw_ret_t subscription_count_matched_publishers(
Expand Down
21 changes: 21 additions & 0 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
#include "liveliness_utils.hpp"

#include <functional>
#include <limits>
#include <optional>
#include <random>
#include <sstream>
#include <stdexcept>
#include <string>
Expand Down Expand Up @@ -609,6 +611,12 @@ std::string Entity::node_enclave() const
return this->node_info_.enclave_;
}

///=============================================================================
NodeInfo Entity::node_info() const
{
return this->node_info_;
}

///=============================================================================
std::optional<TopicInfo> Entity::topic_info() const
{
Expand Down Expand Up @@ -657,4 +665,17 @@ std::string demangle_name(const std::string & input)
return output;
}
} // namespace liveliness

void
generate_random_gid(uint8_t gid[RMW_GID_STORAGE_SIZE])
{
std::random_device dev;
std::mt19937 rng(dev());
std::uniform_int_distribution<std::mt19937::result_type> dist(
std::numeric_limits<unsigned char>::min(), std::numeric_limits<unsigned char>::max());

for (size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i) {
gid[i] = dist(rng);
}
}
} // namespace rmw_zenoh_cpp
10 changes: 9 additions & 1 deletion rmw_zenoh_cpp/src/detail/liveliness_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,10 @@ class Entity

std::string node_enclave() const;

/// Get the topic_info.
/// Get the NodeInfo.
NodeInfo node_info() const;

/// Get the TopicInfo if present.
std::optional<TopicInfo> topic_info() const;

/// Get the liveliness keyexpr for this entity.
Expand Down Expand Up @@ -230,8 +233,13 @@ std::optional<rmw_qos_profile_t> keyexpr_to_qos(const std::string & keyexpr);
/// Convert a Zenoh id to a string.
std::string zid_to_str(const z_id_t & id);
} // namespace liveliness

///=============================================================================
// Helper function to generate a randon GID.
void generate_random_gid(uint8_t gid[RMW_GID_STORAGE_SIZE]);
} // namespace rmw_zenoh_cpp


///=============================================================================
// Allow Entity to be hashed and used as a key in unordered_maps/sets
namespace std
Expand Down
13 changes: 12 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,17 @@ rmw_ret_t rmw_context_impl_s::Data::subscribe_to_ros_graph()
rmw_ret_t rmw_context_impl_s::Data::shutdown()
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
rmw_ret_t ret = RMW_RET_OK;
if (is_shutdown_) {
return RMW_RET_OK;
return ret;
}

// Shutdown all the nodes in this context.
for (auto node_it = nodes_.begin(); node_it != nodes_.end(); ++node_it) {
ret = node_it->second->shutdown();
if (ret != RMW_RET_OK) {
return ret;
}
}

z_undeclare_subscriber(z_move(graph_subscriber_));
Expand All @@ -170,6 +179,7 @@ rmw_ret_t rmw_context_impl_s::Data::shutdown()
rmw_context_impl_s::Data::~Data()
{
auto ret = this->shutdown();
nodes_.clear();
static_cast<void>(ret);
}

Expand Down Expand Up @@ -407,6 +417,7 @@ bool rmw_context_impl_s::create_node_data(
}

auto node_data = rmw_zenoh_cpp::NodeData::make(
node,
this->get_next_entity_id(),
z_loan(data_->session_),
data_->domain_id_,
Expand Down
7 changes: 0 additions & 7 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,6 @@ saved_msg_data::~saved_msg_data()
z_drop(z_move(payload));
}

///=============================================================================
size_t rmw_publisher_data_t::get_next_sequence_number()
{
std::lock_guard<std::mutex> lock(sequence_number_mutex_);
return sequence_number_++;
}

///=============================================================================
bool rmw_subscription_data_t::queue_has_data_and_attach_condition_if_not(
rmw_wait_set_data_t * wait_set_data)
Expand Down
39 changes: 0 additions & 39 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,45 +44,6 @@

namespace rmw_zenoh_cpp
{
///=============================================================================
class rmw_publisher_data_t final
{
public:
// The Entity generated for the publisher.
std::shared_ptr<liveliness::Entity> entity;

// An owned publisher.
z_owned_publisher_t pub;

// Optional publication cache when durability is transient_local.
std::optional<ze_owned_publication_cache_t> pub_cache;

// Store the actual QoS profile used to configure this publisher.
rmw_qos_profile_t adapted_qos_profile;

// Liveliness token for the publisher.
zc_owned_liveliness_token_t token;

// Type support fields
const void * type_support_impl;
const char * typesupport_identifier;
const rosidl_type_hash_t * type_hash;
MessageTypeSupport * type_support;

// Context for memory allocation for messages.
rmw_context_t * context;

uint8_t pub_gid[RMW_GID_STORAGE_SIZE];

size_t get_next_sequence_number();

EventsManager events_mgr;

private:
std::mutex sequence_number_mutex_;
size_t sequence_number_{1};
};

///=============================================================================
// z_owned_closure_sample_t
void sub_data_handler(const z_sample_t * sample, void * sub_data);
Expand Down
111 changes: 108 additions & 3 deletions rmw_zenoh_cpp/src/detail/rmw_node_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ namespace rmw_zenoh_cpp
{
///=============================================================================
std::shared_ptr<NodeData> NodeData::make(
const rmw_node_t * const node,
std::size_t id,
z_session_t session,
std::size_t domain_id,
Expand Down Expand Up @@ -73,6 +74,7 @@ std::shared_ptr<NodeData> NodeData::make(

return std::shared_ptr<NodeData>(
new NodeData{
node,
id,
std::move(entity),
std::move(token)
Expand All @@ -81,20 +83,31 @@ std::shared_ptr<NodeData> NodeData::make(

///=============================================================================
NodeData::NodeData(
const rmw_node_t * const node,
std::size_t id,
std::shared_ptr<liveliness::Entity> entity,
zc_owned_liveliness_token_t token)
: id_(std::move(id)),
: node_(node),
id_(std::move(id)),
entity_(std::move(entity)),
token_(std::move(token))
token_(std::move(token)),
is_shutdown_(false),
pubs_({})
{
// Do nothing.
}

///=============================================================================
NodeData::~NodeData()
{
zc_liveliness_undeclare_token(z_move(token_));
const rmw_ret_t ret = this->shutdown();
if (ret != RMW_RET_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Error destructing node /%s.",
entity_->node_name().c_str()
);
}
}

///=============================================================================
Expand All @@ -103,4 +116,96 @@ std::size_t NodeData::id() const
std::lock_guard<std::mutex> lock(mutex_);
return id_;
}

///=============================================================================
bool NodeData::create_pub_data(
const rmw_publisher_t * const publisher,
z_session_t session,
std::size_t id,
const std::string & topic_name,
const rosidl_message_type_support_t * type_support,
const rmw_qos_profile_t * qos_profile)
{
std::lock_guard<std::mutex> lock_guard(mutex_);
if (pubs_.count(publisher) > 0) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"PublisherData already exists.");
return false;
}

auto pub_data = PublisherData::make(
std::move(session),
node_,
entity_->node_info(),
id_,
std::move(id),
std::move(topic_name),
type_support,
qos_profile);
if (pub_data == nullptr) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to make PublisherData.");
return false;
}

auto insertion = pubs_.insert(std::make_pair(publisher, std::move(pub_data)));
if (!insertion.second) {
return false;
}
return true;
}

///=============================================================================
PublisherDataPtr NodeData::get_pub_data(const rmw_publisher_t * const publisher)
{
std::lock_guard<std::mutex> lock_guard(mutex_);
auto it = pubs_.find(publisher);
if (it == pubs_.end()) {
return nullptr;
}

return it->second;
}

///=============================================================================
void NodeData::delete_pub_data(const rmw_publisher_t * const publisher)
{
std::lock_guard<std::mutex> lock_guard(mutex_);
pubs_.erase(publisher);
}

///=============================================================================
rmw_ret_t NodeData::shutdown()
{
std::lock_guard<std::mutex> lock(mutex_);
rmw_ret_t ret = RMW_RET_OK;
if (is_shutdown_) {
return ret;
}

// Shutdown all the entities within this node.
for (auto pub_it = pubs_.begin(); pub_it != pubs_.end(); ++pub_it) {
ret = pub_it->second->shutdown();
if (ret != RMW_RET_OK) {
return ret;
}
}

// Unregister this node from the ROS graph.
zc_liveliness_undeclare_token(z_move(token_));

is_shutdown_ = true;
return RMW_RET_OK;
}

///=============================================================================
// Check if the Node is shutdown.
bool NodeData::is_shutdown() const
{
std::lock_guard<std::mutex> lock(mutex_);
return is_shutdown_;
}

} // namespace rmw_zenoh_cpp
Loading

0 comments on commit d19c278

Please sign in to comment.