Skip to content

Commit

Permalink
Store rmw_node_data_t in rmw_context_impl_s
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Aug 6, 2024
1 parent eb936da commit 8f897b5
Show file tree
Hide file tree
Showing 7 changed files with 311 additions and 126 deletions.
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ add_library(rmw_zenoh_cpp SHARED
src/detail/qos.cpp
src/detail/rmw_context_impl_s.cpp
src/detail/rmw_data_types.cpp
src/detail/rmw_node_data_t.cpp
src/detail/service_type_support.cpp
src/detail/type_support.cpp
src/detail/type_support_common.cpp
Expand Down
74 changes: 52 additions & 22 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ void rmw_context_impl_s::graph_sub_data_handler(const z_sample_t * sample, void
}

// Update the graph cache.
std::lock_guard<std::mutex> lock(data_ptr->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_ptr->mutex_);
if (data_ptr->is_shutdown_) {
return;
}
Expand Down Expand Up @@ -169,7 +169,7 @@ rmw_ret_t rmw_context_impl_s::Data::subscribe()
///=============================================================================
rmw_ret_t rmw_context_impl_s::Data::shutdown()
{
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (is_shutdown_) {
return RMW_RET_OK;
}
Expand Down Expand Up @@ -232,35 +232,35 @@ rmw_context_impl_s::rmw_context_impl_s(
///=============================================================================
std::string rmw_context_impl_s::enclave() const
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->enclave_;
}

///=============================================================================
z_session_t rmw_context_impl_s::session() const
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return z_loan(data_->session_);
}

///=============================================================================
std::optional<zc_owned_shm_manager_t> & rmw_context_impl_s::shm_manager()
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->shm_manager_;
}

///=============================================================================
rmw_guard_condition_t * rmw_context_impl_s::graph_guard_condition()
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->graph_guard_condition_;
}

///=============================================================================
size_t rmw_context_impl_s::get_next_entity_id()
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->next_entity_id_++;
}

Expand All @@ -273,14 +273,14 @@ rmw_ret_t rmw_context_impl_s::shutdown()
///=============================================================================
bool rmw_context_impl_s::is_shutdown() const
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->is_shutdown_;
}

///=============================================================================
bool rmw_context_impl_s::session_is_valid() const
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return z_check(data_->session_);
}

Expand All @@ -291,7 +291,7 @@ rmw_ret_t rmw_context_impl_s::get_node_names(
rcutils_string_array_t * enclaves,
rcutils_allocator_t * allocator) const
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->graph_cache_->get_node_names(
node_names,
node_namespaces,
Expand All @@ -305,7 +305,7 @@ rmw_ret_t rmw_context_impl_s::get_topic_names_and_types(
bool no_demangle,
rmw_names_and_types_t * topic_names_and_types) const
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->graph_cache_->get_topic_names_and_types(
allocator,
no_demangle,
Expand All @@ -317,7 +317,7 @@ rmw_ret_t rmw_context_impl_s::publisher_count_matched_subscriptions(
const rmw_publisher_t * publisher,
size_t * subscription_count)
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->graph_cache_->publisher_count_matched_subscriptions(
publisher,
subscription_count);
Expand All @@ -328,7 +328,7 @@ rmw_ret_t rmw_context_impl_s::subscription_count_matched_publishers(
const rmw_subscription_t * subscription,
size_t * publisher_count)
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->graph_cache_->subscription_count_matched_publishers(
subscription,
publisher_count);
Expand All @@ -339,7 +339,7 @@ rmw_ret_t rmw_context_impl_s::get_service_names_and_types(
rcutils_allocator_t * allocator,
rmw_names_and_types_t * service_names_and_types) const
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->graph_cache_->get_service_names_and_types(
allocator,
service_names_and_types);
Expand All @@ -350,7 +350,7 @@ rmw_ret_t rmw_context_impl_s::count_publishers(
const char * topic_name,
size_t * count) const
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->graph_cache_->count_publishers(
topic_name,
count);
Expand All @@ -361,7 +361,7 @@ rmw_ret_t rmw_context_impl_s::count_subscriptions(
const char * topic_name,
size_t * count) const
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->graph_cache_->count_subscriptions(
topic_name,
count);
Expand All @@ -372,7 +372,7 @@ rmw_ret_t rmw_context_impl_s::count_services(
const char * service_name,
size_t * count) const
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->graph_cache_->count_services(
service_name,
count);
Expand All @@ -383,7 +383,7 @@ rmw_ret_t rmw_context_impl_s::count_clients(
const char * service_name,
size_t * count) const
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->graph_cache_->count_clients(
service_name,
count);
Expand All @@ -398,7 +398,7 @@ rmw_ret_t rmw_context_impl_s::get_entity_names_and_types_by_node(
bool no_demangle,
rmw_names_and_types_t * names_and_types) const
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->graph_cache_->get_entity_names_and_types_by_node(
entity_type,
allocator,
Expand All @@ -416,7 +416,7 @@ rmw_ret_t rmw_context_impl_s::get_entities_info_by_topic(
bool no_demangle,
rmw_topic_endpoint_info_array_t * endpoints_info) const
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->graph_cache_->get_entities_info_by_topic(
entity_type,
allocator,
Expand All @@ -431,7 +431,7 @@ rmw_ret_t rmw_context_impl_s::service_server_is_available(
const char * service_type,
bool * is_available) const
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->graph_cache_->service_server_is_available(
service_name,
service_type,
Expand All @@ -444,9 +444,39 @@ void rmw_context_impl_s::set_qos_event_callback(
const rmw_zenoh_cpp::rmw_zenoh_event_type_t & event_type,
GraphCacheEventCallback callback)
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->graph_cache_->set_qos_event_callback(
std::move(entity),
event_type,
std::move(callback));
}

///=============================================================================
bool rmw_context_impl_s::add_node(
const rmw_node_t * const node,
std::unique_ptr<rmw_node_data_t> node_data)
{
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
auto insertion = data_->nodes_.insert(std::make_pair(node, std::move(node_data)));
return insertion.second;
}

///=============================================================================
void rmw_context_impl_s::remove_node(
const rmw_node_t * const node)
{
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
data_->nodes_.erase(node);
}

///=============================================================================
std::optional<std::size_t> rmw_context_impl_s::get_node_id(
const rmw_node_t * const node) const
{
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
auto it = data_->nodes_.find(node);
if (it == data_->nodes_.end()) {
return std::nullopt;
}
return it->second->id();
}
17 changes: 16 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
#include <mutex>
#include <optional>
#include <string>
#include <unordered_map>

#include "graph_cache.hpp"
#include "liveliness_utils.hpp"
#include "rmw_node_data_t.hpp"

#include "rcutils/types.h"
#include "rmw/rmw.h"
Expand All @@ -33,6 +35,7 @@ class rmw_context_impl_s final
{
public:
using GraphCacheEventCallback = rmw_zenoh_cpp::GraphCache::GraphCacheEventCallback;
using rmw_node_data_t = rmw_zenoh_cpp::rmw_node_data_t;

// Constructor.
// Once constructed, the context_impl instanced will manage the lifetime
Expand Down Expand Up @@ -136,6 +139,16 @@ class rmw_context_impl_s final
const rmw_zenoh_cpp::rmw_zenoh_event_type_t & event_type,
GraphCacheEventCallback callback);

bool add_node(
const rmw_node_t * const node,
std::unique_ptr<rmw_node_data_t> node_data);

void remove_node(
const rmw_node_t * const node);

std::optional<std::size_t> get_node_id(
const rmw_node_t * const node) const;

private:
// Bundle all class members into a data struct which can be passed as a
// weak ptr to various threads for thread-safe access without capturing
Expand All @@ -161,7 +174,7 @@ class rmw_context_impl_s final
~Data();

// Mutex to lock when accessing members.
mutable std::mutex mutex_;
mutable std::recursive_mutex mutex_;
// RMW allocator.
const rcutils_allocator_t * allocator_;
// Enclave, name used to find security artifacts in a sros2 keystore.
Expand All @@ -186,6 +199,8 @@ class rmw_context_impl_s final
std::unique_ptr<rmw_zenoh_cpp::GraphCache> graph_cache_;
// True once graph subscriber is initialized.
bool is_initialized_;
// Nodes created from this context.
std::unordered_map<const rmw_node_t *, std::unique_ptr<rmw_node_data_t>> nodes_;
};

std::shared_ptr<Data> data_{nullptr};
Expand Down
14 changes: 0 additions & 14 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,6 @@

namespace rmw_zenoh_cpp
{
///=============================================================================
struct rmw_node_data_t
{
// The Entity generated for the node.
std::shared_ptr<liveliness::Entity> entity;

// Liveliness token for the node.
zc_owned_liveliness_token_t token;

// The entity id of this node as generated by get_next_entity_id().
// Every interface created by this node will include this id in its liveliness token.
size_t id;
};

///=============================================================================
class rmw_publisher_data_t final
{
Expand Down
Loading

0 comments on commit 8f897b5

Please sign in to comment.