Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread-safe access to rmw_node_data_t #259

Merged
merged 7 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ add_library(rmw_zenoh_cpp SHARED
src/detail/qos.cpp
src/detail/rmw_context_impl_s.cpp
src/detail/rmw_data_types.cpp
src/detail/rmw_node_data.cpp
src/detail/service_type_support.cpp
src/detail/type_support.cpp
src/detail/type_support_common.cpp
Expand Down
89 changes: 76 additions & 13 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void rmw_context_impl_s::graph_sub_data_handler(const z_sample_t * sample, void
}

// Update the graph cache.
std::lock_guard<std::mutex> lock(data_ptr->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_ptr->mutex_);
Yadunund marked this conversation as resolved.
Show resolved Hide resolved
if (data_ptr->is_shutdown_) {
return;
}
Expand All @@ -81,19 +81,22 @@ void rmw_context_impl_s::graph_sub_data_handler(const z_sample_t * sample, void

///=============================================================================
rmw_context_impl_s::Data::Data(
std::size_t domain_id,
const std::string & enclave,
Yadunund marked this conversation as resolved.
Show resolved Hide resolved
z_owned_session_t session,
std::optional<zc_owned_shm_manager_t> shm_manager,
Yadunund marked this conversation as resolved.
Show resolved Hide resolved
const std::string & liveliness_str,
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache)
: enclave_(std::move(enclave)),
domain_id_(std::move(domain_id)),
session_(std::move(session)),
shm_manager_(std::move(shm_manager)),
liveliness_str_(std::move(liveliness_str)),
graph_cache_(std::move(graph_cache)),
is_shutdown_(false),
next_entity_id_(0),
is_initialized_(false)
is_initialized_(false),
nodes_({})
{
graph_guard_condition_ = std::make_unique<rmw_guard_condition_t>();
graph_guard_condition_->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier;
Expand All @@ -103,7 +106,7 @@ rmw_context_impl_s::Data::Data(
///=============================================================================
rmw_ret_t rmw_context_impl_s::Data::subscribe_to_ros_graph()
{
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (is_initialized_) {
return RMW_RET_OK;
}
Expand Down Expand Up @@ -145,7 +148,7 @@ rmw_ret_t rmw_context_impl_s::Data::subscribe_to_ros_graph()
///=============================================================================
rmw_ret_t rmw_context_impl_s::Data::shutdown()
{
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (is_shutdown_) {
return RMW_RET_OK;
}
Expand Down Expand Up @@ -308,6 +311,7 @@ rmw_context_impl_s::rmw_context_impl_s(
free_shm_manager.cancel();

data_ = std::make_shared<Data>(
domain_id,
std::move(enclave),
std::move(session),
std::move(shm_manager),
Expand All @@ -323,35 +327,35 @@ rmw_context_impl_s::rmw_context_impl_s(
///=============================================================================
std::string rmw_context_impl_s::enclave() const
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->enclave_;
}

///=============================================================================
z_session_t rmw_context_impl_s::session() const
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return z_loan(data_->session_);
}

///=============================================================================
std::optional<zc_owned_shm_manager_t> & rmw_context_impl_s::shm_manager()
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->shm_manager_;
}

///=============================================================================
rmw_guard_condition_t * rmw_context_impl_s::graph_guard_condition()
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->graph_guard_condition_.get();
}

///=============================================================================
size_t rmw_context_impl_s::get_next_entity_id()
std::size_t rmw_context_impl_s::get_next_entity_id()
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->next_entity_id_++;
}

Expand All @@ -364,20 +368,79 @@ rmw_ret_t rmw_context_impl_s::shutdown()
///=============================================================================
bool rmw_context_impl_s::is_shutdown() const
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->is_shutdown_;
}

///=============================================================================
bool rmw_context_impl_s::session_is_valid() const
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return z_check(data_->session_);
}

///=============================================================================
std::shared_ptr<rmw_zenoh_cpp::GraphCache> rmw_context_impl_s::graph_cache()
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->graph_cache_;
}

///=============================================================================
bool rmw_context_impl_s::create_node_data(
const rmw_node_t * const node,
const std::string & ns,
const std::string & node_name)
{
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
if (data_->nodes_.count(node) > 0) {
// Node already exists.
return false;
}

// Check that the Zenoh session is still valid.
if (!z_check(data_->session_)) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to create NodeData as Zenoh session is invalid.");
return false;
}

auto node_data = rmw_zenoh_cpp::NodeData::make(
this->get_next_entity_id(),
z_loan(data_->session_),
data_->domain_id_,
ns,
node_name,
data_->enclave_);
if (node_data == nullptr) {
// Error already handled.
return false;
}

auto node_insertion = data_->nodes_.insert(std::make_pair(node, std::move(node_data)));
if (!node_insertion.second) {
return false;
}

return true;
}

///=============================================================================
std::shared_ptr<rmw_zenoh_cpp::NodeData> rmw_context_impl_s::get_node_data(
const rmw_node_t * const node)
{
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
auto node_it = data_->nodes_.find(node);
if (node_it == data_->nodes_.end()) {
return nullptr;
}
return node_it->second;
}

///=============================================================================
void rmw_context_impl_s::delete_node_data(const rmw_node_t * const node)
{
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
data_->nodes_.erase(node);
}
30 changes: 26 additions & 4 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
#include <mutex>
#include <optional>
#include <string>
#include <unordered_map>

#include "graph_cache.hpp"
#include "guard_condition.hpp"
#include "liveliness_utils.hpp"
#include "rmw_node_data.hpp"

#include "rcutils/types.h"
#include "rmw/rmw.h"
Expand All @@ -34,7 +36,7 @@
class rmw_context_impl_s final
{
public:
// Constructor that internally initializees the Zenoh session and other artifacts.
// Constructor that internally initializes the Zenoh session and other artifacts.
// Throws an std::runtime_error if any of the initializations fail.
// The construction will block until a Zenoh router is detected.
// TODO(Yadunund): Make this a non-blocking call by checking for the Zenoh
Expand Down Expand Up @@ -62,7 +64,7 @@ class rmw_context_impl_s final
rmw_guard_condition_t * graph_guard_condition();

// Get a unique id for a new entity.
size_t get_next_entity_id();
std::size_t get_next_entity_id();

// Shutdown the Zenoh session.
rmw_ret_t shutdown();
Expand All @@ -76,6 +78,21 @@ class rmw_context_impl_s final
/// Return a shared_ptr to the GraphCache stored in this context.
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache();

/// Create a NodeData and store it within this context. The NodeData can be
/// retrieved using get_node().
/// Returns false if parameters are invalid.
bool create_node_data(
const rmw_node_t * const node,
const std::string & ns,
const std::string & node_name);

/// Retrieve the NodeData for a given rmw_node_t if present.
std::shared_ptr<rmw_zenoh_cpp::NodeData> get_node_data(
const rmw_node_t * const node);

/// Delete the NodeData for a given rmw_node_t if present.
void delete_node_data(const rmw_node_t * const node);

private:
// Bundle all class members into a data struct which can be passed as a
// weak ptr to various threads for thread-safe access without capturing
Expand All @@ -84,6 +101,7 @@ class rmw_context_impl_s final
{
// Constructor.
Data(
std::size_t domain_id,
const std::string & enclave,
z_owned_session_t session,
std::optional<zc_owned_shm_manager_t> shm_manager,
Expand All @@ -100,11 +118,13 @@ class rmw_context_impl_s final
~Data();

// Mutex to lock when accessing members.
mutable std::mutex mutex_;
mutable std::recursive_mutex mutex_;
// RMW allocator.
const rcutils_allocator_t * allocator_;
// Enclave, name used to find security artifacts in a sros2 keystore.
std::string enclave_;
// The ROS domain id of this context.
std::size_t domain_id_;
// An owned session.
z_owned_session_t session_;
// An optional SHM manager that is initialized of SHM is enabled in the
Expand All @@ -124,9 +144,11 @@ class rmw_context_impl_s final
// Shutdown flag.
bool is_shutdown_;
// A counter to assign a local id for every entity created in this session.
size_t next_entity_id_;
std::size_t next_entity_id_;
// True once graph subscriber is initialized.
bool is_initialized_;
// Nodes created from this context.
std::unordered_map<const rmw_node_t *, std::shared_ptr<rmw_zenoh_cpp::NodeData>> nodes_;
};

std::shared_ptr<Data> data_{nullptr};
Expand Down
14 changes: 0 additions & 14 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,6 @@

namespace rmw_zenoh_cpp
{
///=============================================================================
struct rmw_node_data_t
{
// The Entity generated for the node.
std::shared_ptr<liveliness::Entity> entity;

// Liveliness token for the node.
zc_owned_liveliness_token_t token;

// The entity id of this node as generated by get_next_entity_id().
// Every interface created by this node will include this id in its liveliness token.
size_t id;
};

///=============================================================================
class rmw_publisher_data_t final
{
Expand Down
Loading