Skip to content

Commit

Permalink
Thread-safe access to rmw_node_data_t (#259)
Browse files Browse the repository at this point in the history
* Store node data within context impl

Signed-off-by: Yadunund <[email protected]>

* Make NodeData constructor public

Signed-off-by: Yadunund <[email protected]>

* Switch to recursive_mutex in rmw_context_impl_s

Signed-off-by: Yadunund <[email protected]>

* NodeData API to construct liveliness token

Signed-off-by: Yadunund <[email protected]>

* Use count to check if node exists

Signed-off-by: Yadunund <[email protected]>

* Fix compilation after rebase

Signed-off-by: Yadunund <[email protected]>

* Address feedback

Signed-off-by: Yadunund <[email protected]>

---------

Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund authored Sep 30, 2024
1 parent 67ed661 commit 824886a
Show file tree
Hide file tree
Showing 7 changed files with 321 additions and 123 deletions.
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ add_library(rmw_zenoh_cpp SHARED
src/detail/qos.cpp
src/detail/rmw_context_impl_s.cpp
src/detail/rmw_data_types.cpp
src/detail/rmw_node_data.cpp
src/detail/service_type_support.cpp
src/detail/type_support.cpp
src/detail/type_support_common.cpp
Expand Down
89 changes: 76 additions & 13 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void rmw_context_impl_s::graph_sub_data_handler(const z_sample_t * sample, void
}

// Update the graph cache.
std::lock_guard<std::mutex> lock(data_ptr->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_ptr->mutex_);
if (data_ptr->is_shutdown_) {
return;
}
Expand All @@ -81,19 +81,22 @@ void rmw_context_impl_s::graph_sub_data_handler(const z_sample_t * sample, void

///=============================================================================
rmw_context_impl_s::Data::Data(
std::size_t domain_id,
const std::string & enclave,
z_owned_session_t session,
std::optional<zc_owned_shm_manager_t> shm_manager,
const std::string & liveliness_str,
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache)
: enclave_(std::move(enclave)),
domain_id_(std::move(domain_id)),
session_(std::move(session)),
shm_manager_(std::move(shm_manager)),
liveliness_str_(std::move(liveliness_str)),
graph_cache_(std::move(graph_cache)),
is_shutdown_(false),
next_entity_id_(0),
is_initialized_(false)
is_initialized_(false),
nodes_({})
{
graph_guard_condition_ = std::make_unique<rmw_guard_condition_t>();
graph_guard_condition_->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier;
Expand All @@ -103,7 +106,7 @@ rmw_context_impl_s::Data::Data(
///=============================================================================
rmw_ret_t rmw_context_impl_s::Data::subscribe_to_ros_graph()
{
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (is_initialized_) {
return RMW_RET_OK;
}
Expand Down Expand Up @@ -145,7 +148,7 @@ rmw_ret_t rmw_context_impl_s::Data::subscribe_to_ros_graph()
///=============================================================================
rmw_ret_t rmw_context_impl_s::Data::shutdown()
{
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (is_shutdown_) {
return RMW_RET_OK;
}
Expand Down Expand Up @@ -308,6 +311,7 @@ rmw_context_impl_s::rmw_context_impl_s(
free_shm_manager.cancel();

data_ = std::make_shared<Data>(
domain_id,
std::move(enclave),
std::move(session),
std::move(shm_manager),
Expand All @@ -323,35 +327,35 @@ rmw_context_impl_s::rmw_context_impl_s(
///=============================================================================
std::string rmw_context_impl_s::enclave() const
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->enclave_;
}

///=============================================================================
z_session_t rmw_context_impl_s::session() const
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return z_loan(data_->session_);
}

///=============================================================================
std::optional<zc_owned_shm_manager_t> & rmw_context_impl_s::shm_manager()
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->shm_manager_;
}

///=============================================================================
rmw_guard_condition_t * rmw_context_impl_s::graph_guard_condition()
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->graph_guard_condition_.get();
}

///=============================================================================
size_t rmw_context_impl_s::get_next_entity_id()
std::size_t rmw_context_impl_s::get_next_entity_id()
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->next_entity_id_++;
}

Expand All @@ -364,20 +368,79 @@ rmw_ret_t rmw_context_impl_s::shutdown()
///=============================================================================
bool rmw_context_impl_s::is_shutdown() const
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->is_shutdown_;
}

///=============================================================================
bool rmw_context_impl_s::session_is_valid() const
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return z_check(data_->session_);
}

///=============================================================================
std::shared_ptr<rmw_zenoh_cpp::GraphCache> rmw_context_impl_s::graph_cache()
{
std::lock_guard<std::mutex> lock(data_->mutex_);
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->graph_cache_;
}

///=============================================================================
bool rmw_context_impl_s::create_node_data(
const rmw_node_t * const node,
const std::string & ns,
const std::string & node_name)
{
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
if (data_->nodes_.count(node) > 0) {
// Node already exists.
return false;
}

// Check that the Zenoh session is still valid.
if (!z_check(data_->session_)) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to create NodeData as Zenoh session is invalid.");
return false;
}

auto node_data = rmw_zenoh_cpp::NodeData::make(
this->get_next_entity_id(),
z_loan(data_->session_),
data_->domain_id_,
ns,
node_name,
data_->enclave_);
if (node_data == nullptr) {
// Error already handled.
return false;
}

auto node_insertion = data_->nodes_.insert(std::make_pair(node, std::move(node_data)));
if (!node_insertion.second) {
return false;
}

return true;
}

///=============================================================================
std::shared_ptr<rmw_zenoh_cpp::NodeData> rmw_context_impl_s::get_node_data(
const rmw_node_t * const node)
{
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
auto node_it = data_->nodes_.find(node);
if (node_it == data_->nodes_.end()) {
return nullptr;
}
return node_it->second;
}

///=============================================================================
void rmw_context_impl_s::delete_node_data(const rmw_node_t * const node)
{
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
data_->nodes_.erase(node);
}
30 changes: 26 additions & 4 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
#include <mutex>
#include <optional>
#include <string>
#include <unordered_map>

#include "graph_cache.hpp"
#include "guard_condition.hpp"
#include "liveliness_utils.hpp"
#include "rmw_node_data.hpp"

#include "rcutils/types.h"
#include "rmw/rmw.h"
Expand All @@ -34,7 +36,7 @@
class rmw_context_impl_s final
{
public:
// Constructor that internally initializees the Zenoh session and other artifacts.
// Constructor that internally initializes the Zenoh session and other artifacts.
// Throws an std::runtime_error if any of the initializations fail.
// The construction will block until a Zenoh router is detected.
// TODO(Yadunund): Make this a non-blocking call by checking for the Zenoh
Expand Down Expand Up @@ -62,7 +64,7 @@ class rmw_context_impl_s final
rmw_guard_condition_t * graph_guard_condition();

// Get a unique id for a new entity.
size_t get_next_entity_id();
std::size_t get_next_entity_id();

// Shutdown the Zenoh session.
rmw_ret_t shutdown();
Expand All @@ -76,6 +78,21 @@ class rmw_context_impl_s final
/// Return a shared_ptr to the GraphCache stored in this context.
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache();

/// Create a NodeData and store it within this context. The NodeData can be
/// retrieved using get_node().
/// Returns false if parameters are invalid.
bool create_node_data(
const rmw_node_t * const node,
const std::string & ns,
const std::string & node_name);

/// Retrieve the NodeData for a given rmw_node_t if present.
std::shared_ptr<rmw_zenoh_cpp::NodeData> get_node_data(
const rmw_node_t * const node);

/// Delete the NodeData for a given rmw_node_t if present.
void delete_node_data(const rmw_node_t * const node);

private:
// Bundle all class members into a data struct which can be passed as a
// weak ptr to various threads for thread-safe access without capturing
Expand All @@ -84,6 +101,7 @@ class rmw_context_impl_s final
{
// Constructor.
Data(
std::size_t domain_id,
const std::string & enclave,
z_owned_session_t session,
std::optional<zc_owned_shm_manager_t> shm_manager,
Expand All @@ -100,11 +118,13 @@ class rmw_context_impl_s final
~Data();

// Mutex to lock when accessing members.
mutable std::mutex mutex_;
mutable std::recursive_mutex mutex_;
// RMW allocator.
const rcutils_allocator_t * allocator_;
// Enclave, name used to find security artifacts in a sros2 keystore.
std::string enclave_;
// The ROS domain id of this context.
std::size_t domain_id_;
// An owned session.
z_owned_session_t session_;
// An optional SHM manager that is initialized of SHM is enabled in the
Expand All @@ -124,9 +144,11 @@ class rmw_context_impl_s final
// Shutdown flag.
bool is_shutdown_;
// A counter to assign a local id for every entity created in this session.
size_t next_entity_id_;
std::size_t next_entity_id_;
// True once graph subscriber is initialized.
bool is_initialized_;
// Nodes created from this context.
std::unordered_map<const rmw_node_t *, std::shared_ptr<rmw_zenoh_cpp::NodeData>> nodes_;
};

std::shared_ptr<Data> data_{nullptr};
Expand Down
14 changes: 0 additions & 14 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,6 @@

namespace rmw_zenoh_cpp
{
///=============================================================================
struct rmw_node_data_t
{
// The Entity generated for the node.
std::shared_ptr<liveliness::Entity> entity;

// Liveliness token for the node.
zc_owned_liveliness_token_t token;

// The entity id of this node as generated by get_next_entity_id().
// Every interface created by this node will include this id in its liveliness token.
size_t id;
};

///=============================================================================
class rmw_publisher_data_t final
{
Expand Down
Loading

0 comments on commit 824886a

Please sign in to comment.