Skip to content

Commit

Permalink
cleanups and shm
Browse files Browse the repository at this point in the history
Signed-off-by: Alejandro Hernández Cordero <[email protected]>
  • Loading branch information
ahcorde committed Nov 22, 2024
1 parent 8505a84 commit e71614a
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 70 deletions.
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/src/detail/rmw_client_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ rmw_ret_t ClientData::send_request(
std::weak_ptr<rmw_zenoh_cpp::ClientData> client_data = shared_from_this();
zenoh::ZResult err;
std::string parameters;
context_impl->session_cpp()->get(
context_impl->session()->get(
keyexpr_.value(),
parameters,
[client_data](const zenoh::Reply& reply) {
Expand Down
81 changes: 27 additions & 54 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,18 @@ class rmw_context_impl_s::Data final
throw std::runtime_error("Error configuring Zenoh session.");
}

// // Check if shm is enabled.
// z_owned_string_t shm_enabled;
// zc_config_get_from_str(z_loan(config), Z_CONFIG_SHARED_MEMORY_KEY, &shm_enabled);
// auto always_free_shm_enabled = rcpputils::make_scope_exit(
// [&shm_enabled]() {
// z_drop(z_move(shm_enabled));
// });
zenoh::ZResult result;

// Check if shm is enabled.
std::string shm_enabled = config.value().get(Z_CONFIG_SHARED_MEMORY_KEY, &result);
if (result != Z_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Not able to get %s from the config file",
Z_CONFIG_SHARED_MEMORY_KEY);
}

// Initialize the zenoh session.
zenoh::ZResult result;
session_ = std::make_shared<zenoh::Session>(
std::move(config.value()),
zenoh::Session::SessionOptions::create_default(),
Expand Down Expand Up @@ -183,29 +185,15 @@ class rmw_context_impl_s::Data final

// Initialize the shm manager if shared_memory is enabled in the config.
shm_provider_ = std::nullopt;
// if (strncmp(
// z_string_data(z_loan(shm_enabled)),
// "true",
// z_string_len(z_loan(shm_enabled))) == 0)
// {
// // TODO(yuyuan): determine the default alignment of SHM
// z_alloc_alignment_t alignment = {5};
// z_owned_memory_layout_t layout;
// z_memory_layout_new(&layout, SHM_BUFFER_SIZE_MB * 1024 * 1024, alignment);

// z_owned_shm_provider_t provider;
// if (z_posix_shm_provider_new(&provider, z_loan(layout)) != Z_OK) {
// RMW_ZENOH_LOG_ERROR_NAMED("rmw_zenoh_cpp", "Unable to create a SHM provider.");
// throw std::runtime_error("Unable to create shm provider.");
// }
// shm_provider_ = provider;
// }
// auto free_shm_provider = rcpputils::make_scope_exit(
// [this]() {
// if (shm_provider_.has_value()) {
// z_drop(z_move(shm_provider_.value()));
// }
// });
if (shm_enabled == "true") {
auto layout = zenoh::MemoryLayout(SHM_BUFFER_SIZE_MB * 1024 * 1024, zenoh::AllocAlignment({5}));
zenoh::PosixShmProvider provider(layout, &result);
if (result != Z_OK) {
throw std::runtime_error("Unable to create shm provider.");
}
shm_provider_ = std::move(provider);

}

graph_guard_condition_ = std::make_unique<rmw_guard_condition_t>();
graph_guard_condition_->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier;
Expand Down Expand Up @@ -277,9 +265,6 @@ class rmw_context_impl_s::Data final
return RMW_RET_ERROR;
}

if (shm_provider_.has_value()) {
z_drop(z_move(shm_provider_.value()));
}
is_shutdown_ = true;

// We specifically do *not* hold the mutex_ while tearing down the session; this allows us
Expand All @@ -295,19 +280,13 @@ class rmw_context_impl_s::Data final
return enclave_;
}

const z_loaned_session_t * session() const
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
return z_loan(session_->_0);
}

const std::shared_ptr<zenoh::Session> session_cpp() const
const std::shared_ptr<zenoh::Session> session() const
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
return session_;
}

std::optional<z_owned_shm_provider_t> & shm_provider()
std::optional<zenoh::ShmProvider> & shm_provider()
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
return shm_provider_;
Expand All @@ -334,7 +313,7 @@ class rmw_context_impl_s::Data final
bool session_is_valid() const
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
return !z_session_is_closed(z_loan(session_->_0));
return !session_->is_closed();
}

std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache()
Expand Down Expand Up @@ -365,7 +344,7 @@ class rmw_context_impl_s::Data final
auto node_data = rmw_zenoh_cpp::NodeData::make(
node,
this->get_next_entity_id(),
session_cpp(),
session(),
domain_id_,
ns,
node_name,
Expand Down Expand Up @@ -445,7 +424,7 @@ class rmw_context_impl_s::Data final
std::shared_ptr<zenoh::Session> session_;
// An optional SHM manager that is initialized of SHM is enabled in the
// zenoh session config.
std::optional<z_owned_shm_provider_t> shm_provider_;
std::optional<zenoh::ShmProvider> shm_provider_;
// Graph cache.
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache_;
// ROS graph liveliness subscriber.
Expand Down Expand Up @@ -487,19 +466,13 @@ std::string rmw_context_impl_s::enclave() const
}

///=============================================================================
const z_loaned_session_t * rmw_context_impl_s::session() const
{
return data_->session();
}

///=============================================================================
const std::shared_ptr<zenoh::Session> rmw_context_impl_s::session_cpp() const
const std::shared_ptr<zenoh::Session> rmw_context_impl_s::session() const
{
return data_->session_cpp();
return data_->session();
}

///=============================================================================
std::optional<z_owned_shm_provider_t> & rmw_context_impl_s::shm_provider()
std::optional<zenoh::ShmProvider> & rmw_context_impl_s::shm_provider()
{
return data_->shm_provider();
}
Expand Down
5 changes: 2 additions & 3 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,13 @@ class rmw_context_impl_s final
std::string enclave() const;

// Loan the Zenoh session.
const z_loaned_session_t * session() const;
const std::shared_ptr<zenoh::Session> session() const;

const std::shared_ptr<zenoh::Session> session_cpp() const;
// Get a reference to the shm_provider.
// Note: This is not thread-safe.
// TODO(Yadunund): Remove this API and instead include a publish() API
// that handles the shm_provider once the context manages publishers.
std::optional<z_owned_shm_provider_t> & shm_provider();
std::optional<zenoh::ShmProvider> & shm_provider();

// Get the graph guard condition.
rmw_guard_condition_t * graph_guard_condition();
Expand Down
6 changes: 3 additions & 3 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ PublisherData::PublisherData(
///=============================================================================
rmw_ret_t PublisherData::publish(
const void * ros_message,
std::optional<z_owned_shm_provider_t> & shm_provider)
std::optional<zenoh::ShmProvider> & shm_provider)
{
std::lock_guard<std::mutex> lock(mutex_);
if (is_shutdown_) {
Expand Down Expand Up @@ -228,7 +228,7 @@ rmw_ret_t PublisherData::publish(
if (shm_provider.has_value()) {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled.");

auto provider = shm_provider.value();
auto provider = shm_provider.value()._0;
z_buf_layout_alloc_result_t alloc;
// TODO(yuyuan): SHM, configure this
z_alloc_alignment_t alignment = {5};
Expand Down Expand Up @@ -298,7 +298,7 @@ rmw_ret_t PublisherData::publish(
///=============================================================================
rmw_ret_t PublisherData::publish_serialized_message(
const rmw_serialized_message_t * serialized_message,
std::optional<z_owned_shm_provider_t> & /*shm_provider*/)
std::optional<zenoh::ShmProvider> & /*shm_provider*/)
{
eprosima::fastcdr::FastBuffer buffer(
reinterpret_cast<char *>(serialized_message->buffer), serialized_message->buffer_length);
Expand Down
4 changes: 2 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ class PublisherData final
// Publish a ROS message.
rmw_ret_t publish(
const void * ros_message,
std::optional<z_owned_shm_provider_t> & shm_provider);
std::optional<zenoh::ShmProvider> & shm_provider);

// Publish a serialized ROS message.
rmw_ret_t publish_serialized_message(
const rmw_serialized_message_t * serialized_message,
std::optional<z_owned_shm_provider_t> & shm_provider);
std::optional<zenoh::ShmProvider> & shm_provider);

// Get a copy of the keyexpr_hash of this PublisherData's liveliness::Entity.
std::size_t keyexpr_hash() const;
Expand Down
6 changes: 3 additions & 3 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ bool SubscriptionData::init()
sub_options.query_consolidation = zenoh::QueryConsolidation(zenoh::ConsolidationMode::Z_CONSOLIDATION_MODE_NONE);

std::weak_ptr<SubscriptionData> data_wp = shared_from_this();
auto sub = context_impl->session_cpp()->declare_querying_subscriber(
auto sub = context_impl->session()->declare_querying_subscriber(
sub_ke,
[data_wp](const zenoh::Sample& sample) {
auto sub_data = data_wp.lock();
Expand Down Expand Up @@ -284,7 +284,7 @@ bool SubscriptionData::init()
} else {
zenoh::Session::SubscriberOptions sub_options = zenoh::Session::SubscriberOptions::create_default();
std::weak_ptr<SubscriptionData> data_wp = shared_from_this();
zenoh::Subscriber<void> sub = context_impl->session_cpp()->declare_subscriber(
zenoh::Subscriber<void> sub = context_impl->session()->declare_subscriber(
sub_ke,
[data_wp](const zenoh::Sample & sample) {

Expand Down Expand Up @@ -330,7 +330,7 @@ bool SubscriptionData::init()

// Publish to the graph that a new subscription is in town.
std::string liveliness_keyexpr = entity_->liveliness_keyexpr();
token_ = context_impl->session_cpp()->liveliness_declare_token(
token_ = context_impl->session()->liveliness_declare_token(
zenoh::KeyExpr(liveliness_keyexpr),
zenoh::Session::LivelinessDeclarationOptions::create_default(),
&err);
Expand Down
8 changes: 4 additions & 4 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ rmw_create_publisher(

if (!node_data->create_pub_data(
rmw_publisher,
context_impl->session_cpp(),
context_impl->session(),
context_impl->get_next_entity_id(),
topic_name,
type_support,
Expand Down Expand Up @@ -939,7 +939,7 @@ rmw_create_subscription(

if (!node_data->create_sub_data(
rmw_subscription,
context_impl->session_cpp(),
context_impl->session(),
context_impl->graph_cache(),
context_impl->get_next_entity_id(),
topic_name,
Expand Down Expand Up @@ -1416,7 +1416,7 @@ rmw_create_client(

if (!node_data->create_client_data(
rmw_client,
context_impl->session_cpp(),
context_impl->session(),
context_impl->get_next_entity_id(),
service_name,
type_support,
Expand Down Expand Up @@ -1660,7 +1660,7 @@ rmw_create_service(

if (!node_data->create_service_data(
rmw_service,
context_impl->session_cpp(),
context_impl->session(),
context_impl->get_next_entity_id(),
service_name,
type_support,
Expand Down

0 comments on commit e71614a

Please sign in to comment.