Skip to content

Commit

Permalink
merge: integrate rolling into dev/1.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
YuanYuYuan committed Nov 21, 2024
2 parents 5ade015 + 8306a63 commit 1e705c1
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 96 deletions.
13 changes: 8 additions & 5 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ void client_data_handler(z_loaned_reply_t * reply, void * data)
z_owned_reply_t owned_reply;
z_reply_clone(&owned_reply, reply);
client_data->add_new_reply(
std::make_unique<rmw_zenoh_cpp::ZenohReply>(owned_reply, received_timestamp));
std::make_unique<rmw_zenoh_cpp::ZenohReply>(reply, received_timestamp));
}

///=============================================================================
Expand Down Expand Up @@ -447,12 +447,13 @@ rmw_ret_t ClientData::send_request(

// TODO(Yadunund): Once we switch to zenoh-cpp with lambda closures,
// capture shared_from_this() instead of this.
z_owned_closure_reply_t callback;
z_closure(&callback, client_data_handler, client_data_drop, this);
num_in_flight_++;
z_owned_closure_reply_t zn_closure_reply;
z_closure(&zn_closure_reply, client_data_handler, client_data_drop, this);
z_get(
context_impl->session(),
z_loan(keyexpr_), "",
z_move(callback),
z_move(zn_closure_reply),
&opts);

return RMW_RET_OK;
Expand Down Expand Up @@ -536,7 +537,7 @@ bool ClientData::shutdown_and_query_in_flight()
///=============================================================================
void ClientData::decrement_in_flight_and_conditionally_remove()
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
std::unique_lock<std::recursive_mutex> lock(mutex_);
--num_in_flight_;

if (is_shutdown_ && num_in_flight_ == 0) {
Expand All @@ -548,6 +549,8 @@ void ClientData::decrement_in_flight_and_conditionally_remove()
if (node_data == nullptr) {
return;
}
// We have to unlock here since we are about to delete ourself, and thus the unlock would be UB.
lock.unlock();
node_data->delete_client_data(rmw_client_);
}
}
Expand Down
13 changes: 0 additions & 13 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,19 +246,6 @@ class rmw_context_impl_s::Data final
return ret;
}

// Shutdown all the nodes in this context.
for (auto node_it = nodes_.begin(); node_it != nodes_.end(); ++node_it) {
ret = node_it->second->shutdown();
if (ret != RMW_RET_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to shutdown node with id %zu. rmw_ret_t code: %zu.",
node_it->second->id(),
ret
);
}
}

z_undeclare_subscriber(z_move(graph_subscriber_));
if (shm_provider_.has_value()) {
z_drop(z_move(shm_provider_.value()));
Expand Down
80 changes: 15 additions & 65 deletions rmw_zenoh_cpp/src/detail/rmw_node_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ NodeData::~NodeData()
///=============================================================================
std::size_t NodeData::id() const
{
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> lock(mutex_);
return id_;
}

Expand All @@ -127,7 +127,7 @@ bool NodeData::create_pub_data(
const rosidl_message_type_support_t * type_support,
const rmw_qos_profile_t * qos_profile)
{
std::lock_guard<std::mutex> lock_guard(mutex_);
std::lock_guard<std::recursive_mutex> lock_guard(mutex_);
if (is_shutdown_) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
Expand Down Expand Up @@ -168,7 +168,7 @@ bool NodeData::create_pub_data(
///=============================================================================
PublisherDataPtr NodeData::get_pub_data(const rmw_publisher_t * const publisher)
{
std::lock_guard<std::mutex> lock_guard(mutex_);
std::lock_guard<std::recursive_mutex> lock_guard(mutex_);
auto it = pubs_.find(publisher);
if (it == pubs_.end()) {
return nullptr;
Expand All @@ -180,7 +180,7 @@ PublisherDataPtr NodeData::get_pub_data(const rmw_publisher_t * const publisher)
///=============================================================================
void NodeData::delete_pub_data(const rmw_publisher_t * const publisher)
{
std::lock_guard<std::mutex> lock_guard(mutex_);
std::lock_guard<std::recursive_mutex> lock_guard(mutex_);
pubs_.erase(publisher);
}

Expand All @@ -194,7 +194,7 @@ bool NodeData::create_sub_data(
const rosidl_message_type_support_t * type_support,
const rmw_qos_profile_t * qos_profile)
{
std::lock_guard<std::mutex> lock_guard(mutex_);
std::lock_guard<std::recursive_mutex> lock_guard(mutex_);
if (is_shutdown_) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
Expand Down Expand Up @@ -236,7 +236,7 @@ bool NodeData::create_sub_data(
///=============================================================================
SubscriptionDataPtr NodeData::get_sub_data(const rmw_subscription_t * const subscription)
{
std::lock_guard<std::mutex> lock_guard(mutex_);
std::lock_guard<std::recursive_mutex> lock_guard(mutex_);
auto it = subs_.find(subscription);
if (it == subs_.end()) {
return nullptr;
Expand All @@ -248,7 +248,7 @@ SubscriptionDataPtr NodeData::get_sub_data(const rmw_subscription_t * const subs
///=============================================================================
void NodeData::delete_sub_data(const rmw_subscription_t * const subscription)
{
std::lock_guard<std::mutex> lock_guard(mutex_);
std::lock_guard<std::recursive_mutex> lock_guard(mutex_);
subs_.erase(subscription);
}

Expand All @@ -261,7 +261,7 @@ bool NodeData::create_service_data(
const rosidl_service_type_support_t * type_supports,
const rmw_qos_profile_t * qos_profile)
{
std::lock_guard<std::mutex> lock_guard(mutex_);
std::lock_guard<std::recursive_mutex> lock_guard(mutex_);
if (is_shutdown_) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
Expand Down Expand Up @@ -302,7 +302,7 @@ bool NodeData::create_service_data(
///=============================================================================
ServiceDataPtr NodeData::get_service_data(const rmw_service_t * const service)
{
std::lock_guard<std::mutex> lock_guard(mutex_);
std::lock_guard<std::recursive_mutex> lock_guard(mutex_);
auto it = services_.find(service);
if (it == services_.end()) {
return nullptr;
Expand All @@ -314,7 +314,7 @@ ServiceDataPtr NodeData::get_service_data(const rmw_service_t * const service)
///=============================================================================
void NodeData::delete_service_data(const rmw_service_t * const service)
{
std::lock_guard<std::mutex> lock_guard(mutex_);
std::lock_guard<std::recursive_mutex> lock_guard(mutex_);
services_.erase(service);
}

Expand All @@ -328,7 +328,7 @@ bool NodeData::create_client_data(
const rosidl_service_type_support_t * type_supports,
const rmw_qos_profile_t * qos_profile)
{
std::lock_guard<std::mutex> lock_guard(mutex_);
std::lock_guard<std::recursive_mutex> lock_guard(mutex_);
if (is_shutdown_) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
Expand Down Expand Up @@ -370,7 +370,7 @@ bool NodeData::create_client_data(
///=============================================================================
ClientDataPtr NodeData::get_client_data(const rmw_client_t * const client)
{
std::lock_guard<std::mutex> lock_guard(mutex_);
std::lock_guard<std::recursive_mutex> lock_guard(mutex_);
auto it = clients_.find(client);
if (it == clients_.end()) {
return nullptr;
Expand All @@ -382,7 +382,7 @@ ClientDataPtr NodeData::get_client_data(const rmw_client_t * const client)
///=============================================================================
void NodeData::delete_client_data(const rmw_client_t * const client)
{
std::lock_guard<std::mutex> lock_guard(mutex_);
std::lock_guard<std::recursive_mutex> lock_guard(mutex_);
auto client_it = clients_.find(client);
if (client_it == clients_.end()) {
return;
Expand All @@ -395,62 +395,12 @@ void NodeData::delete_client_data(const rmw_client_t * const client)
///=============================================================================
rmw_ret_t NodeData::shutdown()
{
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> lock(mutex_);
rmw_ret_t ret = RMW_RET_OK;
if (is_shutdown_) {
return ret;
}

// Shutdown all the entities within this node.
for (auto pub_it = pubs_.begin(); pub_it != pubs_.end(); ++pub_it) {
ret = pub_it->second->shutdown();
if (ret != RMW_RET_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to shutdown publisher %s within id %zu. rmw_ret_t code: %zu.",
pub_it->second->topic_info().name_.c_str(),
id_,
ret
);
}
}
for (auto sub_it = subs_.begin(); sub_it != subs_.end(); ++sub_it) {
ret = sub_it->second->shutdown();
if (ret != RMW_RET_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to shutdown subscription %s within id %zu. rmw_ret_t code: %zu.",
sub_it->second->topic_info().name_.c_str(),
id_,
ret
);
}
}
for (auto srv_it = services_.begin(); srv_it != services_.end(); ++srv_it) {
ret = srv_it->second->shutdown();
if (ret != RMW_RET_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to shutdown service %s within id %zu. rmw_ret_t code: %zu.",
srv_it->second->topic_info().name_.c_str(),
id_,
ret
);
}
}
for (auto cli_it = clients_.begin(); cli_it != clients_.end(); ++cli_it) {
ret = cli_it->second->shutdown();
if (ret != RMW_RET_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to shutdown client %s within id %zu. rmw_ret_t code: %zu.",
cli_it->second->topic_info().name_.c_str(),
id_,
ret
);
}
}

// Unregister this node from the ROS graph.
zc_liveliness_undeclare_token(z_move(token_));

Expand All @@ -462,7 +412,7 @@ rmw_ret_t NodeData::shutdown()
// Check if the Node is shutdown.
bool NodeData::is_shutdown() const
{
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<std::recursive_mutex> lock(mutex_);
return is_shutdown_;
}

Expand Down
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/src/detail/rmw_node_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class NodeData final
std::shared_ptr<liveliness::Entity> entity,
zc_owned_liveliness_token_t token);
// Internal mutex.
mutable std::mutex mutex_;
mutable std::recursive_mutex mutex_;
// The rmw_node_t associated with this NodeData.
const rmw_node_t * node_;
// The entity id of this node as generated by get_next_entity_id().
Expand Down
12 changes: 5 additions & 7 deletions rmw_zenoh_cpp/src/detail/rmw_service_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ void service_data_handler(z_loaned_query_t * query, void * data)
return;
}

z_owned_query_t owned_query;
z_query_clone(&owned_query, query);
service_data->add_new_query(std::make_unique<ZenohQuery>(owned_query));
std::chrono::nanoseconds::rep received_timestamp =
std::chrono::system_clock::now().time_since_epoch().count();

service_data->add_new_query(std::make_unique<ZenohQuery>(query, received_timestamp));
}

///=============================================================================
Expand Down Expand Up @@ -322,10 +323,7 @@ rmw_ret_t ServiceData::take_request(
RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment");
return RMW_RET_ERROR;
}

auto now = std::chrono::system_clock::now().time_since_epoch();
auto now_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(now);
request_header->received_timestamp = now_ns.count();
request_header->received_timestamp = query->get_received_timestamp();

// Add this query to the map, so that rmw_send_response can quickly look it up later.
const size_t hash = rmw_zenoh_cpp::hash_gid(request_header->request_id.writer_guid);
Expand Down
15 changes: 12 additions & 3 deletions rmw_zenoh_cpp/src/detail/zenoh_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,16 @@ void create_map_and_set_sequence_num(
}

///=============================================================================
ZenohQuery::ZenohQuery(z_owned_query_t query) {query_ = query;}
ZenohQuery::ZenohQuery(const z_loaned_query_t * query, std::chrono::nanoseconds::rep received_timestamp) {
z_query_clone(&query_, query);
received_timestamp_ = received_timestamp;
}

///=============================================================================
std::chrono::nanoseconds::rep ZenohQuery::get_received_timestamp() const
{
return received_timestamp_;
}

///=============================================================================
ZenohQuery::~ZenohQuery() {z_drop(z_move(query_));}
Expand All @@ -44,10 +53,10 @@ const z_loaned_query_t * ZenohQuery::get_query() const {return z_loan(query_);}

///=============================================================================
ZenohReply::ZenohReply(
z_owned_reply_t reply,
const z_loaned_reply_t * reply,
std::chrono::nanoseconds::rep received_timestamp)
{
reply_ = reply;
z_reply_clone(&reply_, reply);
received_timestamp_ = received_timestamp;
}

Expand Down
7 changes: 5 additions & 2 deletions rmw_zenoh_cpp/src/detail/zenoh_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ create_map_and_set_sequence_num(
class ZenohReply final
{
public:
ZenohReply(z_owned_reply_t reply, std::chrono::nanoseconds::rep received_timestamp);
ZenohReply(const z_loaned_reply_t * reply, std::chrono::nanoseconds::rep received_timestamp);

~ZenohReply();

Expand All @@ -53,14 +53,17 @@ class ZenohReply final
class ZenohQuery final
{
public:
ZenohQuery(z_owned_query_t query);
ZenohQuery(const z_loaned_query_t * query, std::chrono::nanoseconds::rep received_timestamp);

~ZenohQuery();

const z_loaned_query_t * get_query() const;

std::chrono::nanoseconds::rep get_received_timestamp() const;

private:
z_owned_query_t query_;
std::chrono::nanoseconds::rep received_timestamp_;
};
} // namespace rmw_zenoh_cpp

Expand Down

0 comments on commit 1e705c1

Please sign in to comment.