Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/rolling' into ahcorde/dev/1.0.0-cpp
Browse files Browse the repository at this point in the history
  • Loading branch information
ahcorde committed Dec 11, 2024
2 parents 196f30d + ca5058c commit d4b9613
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 69 deletions.
67 changes: 16 additions & 51 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ std::shared_ptr<ClientData> ClientData::make(
node,
client,
entity,
session,
request_members,
response_members,
request_type_support,
Expand All @@ -153,22 +154,23 @@ ClientData::ClientData(
const rmw_node_t * rmw_node,
const rmw_client_t * rmw_client,
std::shared_ptr<liveliness::Entity> entity,
std::shared_ptr<zenoh::Session> sess,
const void * request_type_support_impl,
const void * response_type_support_impl,
std::shared_ptr<RequestTypeSupport> request_type_support,
std::shared_ptr<ResponseTypeSupport> response_type_support)
: rmw_node_(rmw_node),
rmw_client_(rmw_client),
entity_(std::move(entity)),
sess_(std::move(sess)),
request_type_support_impl_(request_type_support_impl),
response_type_support_impl_(response_type_support_impl),
request_type_support_(request_type_support),
response_type_support_(response_type_support),
wait_set_data_(nullptr),
sequence_number_(1),
is_shutdown_(false),
initialized_(false),
num_in_flight_(0)
initialized_(false)
{
// Do nothing.
}
Expand Down Expand Up @@ -401,10 +403,6 @@ rmw_ret_t ClientData::send_request(
reinterpret_cast<const uint8_t *>(request_bytes) + data_length);
opts.payload = zenoh::Bytes(raw_bytes);

// TODO(Yadunund): Once we switch to zenoh-cpp with lambda closures,
// capture shared_from_this() instead of this.
num_in_flight_++;

std::weak_ptr<rmw_zenoh_cpp::ClientData> client_data = shared_from_this();
zenoh::ZResult err;
std::string parameters;
Expand Down Expand Up @@ -448,7 +446,6 @@ rmw_ret_t ClientData::send_request(
"Unable to obtain ClientData");
return;
}
sub_data->decrement_in_flight_and_conditionally_remove();
},
std::move(opts),
&err);
Expand Down Expand Up @@ -500,63 +497,31 @@ bool ClientData::detach_condition_and_queue_is_empty()
}

///=============================================================================
void ClientData::_shutdown()
rmw_ret_t ClientData::shutdown()
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (is_shutdown_) {
return;
return RMW_RET_OK;
}

// Unregister this node from the ROS graph.
zenoh::ZResult err;
std::move(token_).value().undeclare(&err);
if (err != Z_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to undeclare liveliness token");
return;
if (initialized_) {
zenoh::ZResult err;
std::move(token_).value().undeclare(&err);
if (err != Z_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to undeclare liveliness token");
return RMW_RET_ERROR;
}
}

sess_.reset();
is_shutdown_ = true;
}

///=============================================================================
rmw_ret_t ClientData::shutdown()
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
_shutdown();
return RMW_RET_OK;
}

///=============================================================================
bool ClientData::shutdown_and_query_in_flight()
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
_shutdown();
return num_in_flight_ > 0;
}

///=============================================================================
void ClientData::decrement_in_flight_and_conditionally_remove()
{
std::unique_lock<std::recursive_mutex> lock(mutex_);
--num_in_flight_;

if (is_shutdown_ && num_in_flight_ == 0) {
rmw_context_impl_s * context_impl = static_cast<rmw_context_impl_s *>(rmw_node_->data);
if (context_impl == nullptr) {
return;
}
std::shared_ptr<rmw_zenoh_cpp::NodeData> node_data = context_impl->get_node_data(rmw_node_);
if (node_data == nullptr) {
return;
}
// We have to unlock here since we are about to delete ourself, and thus the unlock would be UB.
lock.unlock();
node_data->delete_client_data(rmw_client_);
}
}

///=============================================================================
bool ClientData::is_shutdown() const
{
Expand Down
11 changes: 1 addition & 10 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,6 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
// Shutdown this ClientData.
rmw_ret_t shutdown();

// Shutdown this ClientData, and return whether there are any requests currently in flight.
bool shutdown_and_query_in_flight();

// Decrement the in flight requests, and if that drops to 0 remove the client from the node.
void decrement_in_flight_and_conditionally_remove();

// Check if this ClientData is shutdown.
bool is_shutdown() const;

Expand All @@ -114,6 +108,7 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
const rmw_node_t * rmw_node,
const rmw_client_t * client,
std::shared_ptr<liveliness::Entity> entity,
std::shared_ptr<zenoh::Session> sess,
const void * request_type_support_impl,
const void * response_type_support_impl,
std::shared_ptr<RequestTypeSupport> request_type_support,
Expand All @@ -122,9 +117,6 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
// Initialize the Zenoh objects for this entity.
bool init(const std::shared_ptr<zenoh::Session> session);

// Shutdown this client (the mutex is expected to be held by the caller).
void _shutdown();

// Internal mutex.
mutable std::recursive_mutex mutex_;
// The parent node.
Expand Down Expand Up @@ -155,7 +147,6 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
bool is_shutdown_;
// Whether the object has ever successfully been initialized.
bool initialized_;
size_t num_in_flight_;
};
using ClientDataPtr = std::shared_ptr<ClientData>;
using ClientDataConstPtr = std::shared_ptr<const ClientData>;
Expand Down
6 changes: 5 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

#include "rcpputils/scope_exit.hpp"
#include "rmw/error_handling.h"
#include "zenoh_utils.hpp"

// Megabytes of SHM to reserve.
// TODO(clalancette): Make this configurable, or get it from the configuration
Expand Down Expand Up @@ -262,12 +263,15 @@ class rmw_context_impl_s::Data final
return RMW_RET_ERROR;
}

session_.reset();
is_shutdown_ = true;

// We specifically do *not* hold the mutex_ while tearing down the session; this allows us
// to avoid an AB/BA deadlock if shutdown is racing with graph_sub_data_handler().
}

// Drop the shared session.
session_.reset();

return RMW_RET_OK;
}

Expand Down
8 changes: 1 addition & 7 deletions rmw_zenoh_cpp/src/detail/rmw_node_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,13 +380,7 @@ ClientDataPtr NodeData::get_client_data(const rmw_client_t * const client)
void NodeData::delete_client_data(const rmw_client_t * const client)
{
std::lock_guard<std::recursive_mutex> lock_guard(mutex_);
auto client_it = clients_.find(client);
if (client_it == clients_.end()) {
return;
}
if (!client_it->second->shutdown_and_query_in_flight()) {
clients_.erase(client);
}
clients_.erase(client);
}

///=============================================================================
Expand Down
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "liveliness_utils.hpp"
#include "message_type_support.hpp"
#include "type_support_common.hpp"
#include "zenoh_utils.hpp"

#include "rcutils/allocator.h"

Expand Down
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ rmw_ret_t SubscriptionData::shutdown()
}
}

sess_.reset();
is_shutdown_ = true;
initialized_ = false;
return ret;
Expand Down
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "message_type_support.hpp"
#include "attachment_helpers.hpp"
#include "type_support_common.hpp"
#include "zenoh_utils.hpp"

#include "rcutils/allocator.h"

Expand Down
14 changes: 14 additions & 0 deletions rmw_zenoh_cpp/src/detail/zenoh_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@

namespace rmw_zenoh_cpp
{
/// Loan the zenoh session.
///=============================================================================
const z_loaned_session_t * ZenohSession::loan()
{
return z_loan(inner_);
}

/// Close the zenoh session if destructed.
///=============================================================================
ZenohSession::~ZenohSession()
{
z_close(z_loan_mut(inner_), NULL);
}

///=============================================================================
zenoh::Bytes create_map_and_set_sequence_num(
int64_t sequence_number, std::vector<uint8_t> gid)
Expand Down
15 changes: 15 additions & 0 deletions rmw_zenoh_cpp/src/detail/zenoh_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,21 @@

namespace rmw_zenoh_cpp
{

/// A wrapped zenoh session with customized destruction.
///=============================================================================
class ZenohSession final
{
public:
ZenohSession(z_owned_session_t sess)
: inner_(sess) {}
const z_loaned_session_t * loan();
~ZenohSession();

private:
z_owned_session_t inner_;
};

///=============================================================================
zenoh::Bytes create_map_and_set_sequence_num(
int64_t sequence_number, std::vector<uint8_t> gid);
Expand Down

0 comments on commit d4b9613

Please sign in to comment.