Skip to content

Commit

Permalink
Merge branch 'rolling' into francocipollone/router_via_zenoh_session
Browse files Browse the repository at this point in the history
  • Loading branch information
Yadunund committed Jan 25, 2024
2 parents 93cf50a + 76fec8b commit 7770f2f
Show file tree
Hide file tree
Showing 5 changed files with 315 additions and 153 deletions.
13 changes: 6 additions & 7 deletions rmw_zenoh_cpp/src/detail/guard_condition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,14 @@ void GuardCondition::detach_condition()
}

///==============================================================================
bool GuardCondition::has_triggered() const
bool GuardCondition::get_and_reset_trigger()
{
std::lock_guard<std::mutex> lock(internal_mutex_);
return has_triggered_;
}
bool ret = has_triggered_;

///==============================================================================
void GuardCondition::reset_trigger()
{
std::lock_guard<std::mutex> lock(internal_mutex_);
// There is no data associated with the guard condition, so as soon as the callers asks about the
// state, we can immediately reset and get ready for the next trigger.
has_triggered_ = false;

return ret;
}
5 changes: 1 addition & 4 deletions rmw_zenoh_cpp/src/detail/guard_condition.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ class GuardCondition final

void detach_condition();

bool has_triggered() const;

// Resets has_triggered_ to false.
void reset_trigger();
bool get_and_reset_trigger();

private:
mutable std::mutex internal_mutex_;
Expand Down
251 changes: 199 additions & 52 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,196 @@ saved_msg_data::saved_msg_data(zc_owned_payload_t p, uint64_t recv_ts, const uin
memcpy(publisher_gid, pub_gid, 16);
}

void rmw_subscription_data_t::attach_condition(std::condition_variable * condition_variable)
{
std::lock_guard<std::mutex> lock(condition_mutex_);
condition_ = condition_variable;
}

void rmw_subscription_data_t::notify()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
if (condition_ != nullptr) {
condition_->notify_one();
}
}

void rmw_subscription_data_t::detach_condition()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
condition_ = nullptr;
}

bool rmw_subscription_data_t::message_queue_is_empty() const
{
std::lock_guard<std::mutex> lock(message_queue_mutex_);
return message_queue_.empty();
}

std::unique_ptr<saved_msg_data> rmw_subscription_data_t::pop_next_message()
{
std::lock_guard<std::mutex> lock(message_queue_mutex_);

if (message_queue_.empty()) {
// This tells rcl that the check for a new message was done, but no messages have come in yet.
return nullptr;
}

std::unique_ptr<saved_msg_data> msg_data = std::move(message_queue_.front());
message_queue_.pop_front();

return msg_data;
}

void rmw_subscription_data_t::add_new_message(
std::unique_ptr<saved_msg_data> msg, const std::string & topic_name)
{
std::lock_guard<std::mutex> lock(message_queue_mutex_);

if (message_queue_.size() >= queue_depth) {
// Log warning if message is discarded due to hitting the queue depth
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"Message queue depth of %ld reached, discarding oldest message "
"for subscription for %s",
queue_depth,
topic_name.c_str());

std::unique_ptr<saved_msg_data> old = std::move(message_queue_.front());
z_drop(z_move(old->payload));
message_queue_.pop_front();
}

message_queue_.emplace_back(std::move(msg));

// Since we added new data, trigger the guard condition if it is available
notify();
}

bool rmw_service_data_t::query_queue_is_empty() const
{
std::lock_guard<std::mutex> lock(query_queue_mutex_);
return query_queue_.empty();
}

void rmw_service_data_t::attach_condition(std::condition_variable * condition_variable)
{
std::lock_guard<std::mutex> lock(condition_mutex_);
condition_ = condition_variable;
}

void rmw_service_data_t::detach_condition()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
condition_ = nullptr;
}

std::unique_ptr<ZenohQuery> rmw_service_data_t::pop_next_query()
{
std::lock_guard<std::mutex> lock(query_queue_mutex_);
if (query_queue_.empty()) {
return nullptr;
}

std::unique_ptr<ZenohQuery> query = std::move(query_queue_.front());
query_queue_.pop_front();

return query;
}

void rmw_service_data_t::notify()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
if (condition_ != nullptr) {
condition_->notify_one();
}
}

void rmw_service_data_t::add_new_query(std::unique_ptr<ZenohQuery> query)
{
std::lock_guard<std::mutex> lock(query_queue_mutex_);
query_queue_.emplace_back(std::move(query));

// Since we added new data, trigger the guard condition if it is available
notify();
}

bool rmw_service_data_t::add_to_query_map(
int64_t sequence_number, std::unique_ptr<ZenohQuery> query)
{
std::lock_guard<std::mutex> lock(sequence_to_query_map_mutex_);
if (sequence_to_query_map_.find(sequence_number) != sequence_to_query_map_.end()) {
return false;
}
sequence_to_query_map_.emplace(
std::pair(sequence_number, std::move(query)));

return true;
}

std::unique_ptr<ZenohQuery> rmw_service_data_t::take_from_query_map(int64_t sequence_number)
{
std::lock_guard<std::mutex> lock(sequence_to_query_map_mutex_);
auto query_it = sequence_to_query_map_.find(sequence_number);
if (query_it == sequence_to_query_map_.end()) {
return nullptr;
}

std::unique_ptr<ZenohQuery> query = std::move(query_it->second);
sequence_to_query_map_.erase(query_it);

return query;
}

void rmw_client_data_t::notify()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
if (condition_ != nullptr) {
condition_->notify_one();
}
}

void rmw_client_data_t::add_new_reply(std::unique_ptr<ZenohReply> reply)
{
std::lock_guard<std::mutex> lock(reply_queue_mutex_);
reply_queue_.emplace_back(std::move(reply));

notify();
}

bool rmw_client_data_t::reply_queue_is_empty() const
{
std::lock_guard<std::mutex> lock(reply_queue_mutex_);

return reply_queue_.empty();
}

void rmw_client_data_t::attach_condition(std::condition_variable * condition_variable)
{
std::lock_guard<std::mutex> lock(condition_mutex_);
condition_ = condition_variable;
}

void rmw_client_data_t::detach_condition()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
condition_ = nullptr;
}

std::unique_ptr<ZenohReply> rmw_client_data_t::pop_next_reply()
{
std::lock_guard<std::mutex> lock(reply_queue_mutex_);

if (reply_queue_.empty()) {
return nullptr;
}

std::unique_ptr<ZenohReply> latest_reply = std::move(reply_queue_.front());
reply_queue_.pop_front();

return latest_reply;
}

//==============================================================================
void sub_data_handler(
const z_sample_t * sample,
Expand All @@ -53,34 +243,10 @@ void sub_data_handler(
return;
}

{
std::lock_guard<std::mutex> lock(sub_data->message_queue_mutex);

if (sub_data->message_queue.size() >= sub_data->queue_depth) {
// Log warning if message is discarded due to hitting the queue depth
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"Message queue depth of %ld reached, discarding oldest message "
"for subscription for %s",
sub_data->queue_depth,
z_loan(keystr));

std::unique_ptr<saved_msg_data> old = std::move(sub_data->message_queue.front());
z_drop(&old->payload);
sub_data->message_queue.pop_front();
}

sub_data->message_queue.emplace_back(
std::make_unique<saved_msg_data>(
zc_sample_payload_rcinc(sample),
sample->timestamp.time, sample->timestamp.id.id));

// Since we added new data, trigger the guard condition if it is available
std::lock_guard<std::mutex> internal_lock(sub_data->internal_mutex);
if (sub_data->condition != nullptr) {
sub_data->condition->notify_one();
}
}
sub_data->add_new_message(
std::make_unique<saved_msg_data>(
zc_sample_payload_rcinc(sample),
sample->timestamp.time, sample->timestamp.id.id), z_loan(keystr));
}

ZenohQuery::ZenohQuery(const z_query_t * query)
Expand Down Expand Up @@ -118,18 +284,7 @@ void service_data_handler(const z_query_t * query, void * data)
return;
}

// Get the query parameters and payload
{
std::lock_guard<std::mutex> lock(service_data->query_queue_mutex);
service_data->query_queue.emplace_back(std::make_unique<ZenohQuery>(query));
}
{
// Since we added new data, trigger the guard condition if it is available
std::lock_guard<std::mutex> internal_lock(service_data->internal_mutex);
if (service_data->condition != nullptr) {
service_data->condition->notify_one();
}
}
service_data->add_new_query(std::make_unique<ZenohQuery>(query));
}

ZenohReply::ZenohReply(const z_owned_reply_t * reply)
Expand Down Expand Up @@ -182,16 +337,8 @@ void client_data_handler(z_owned_reply_t * reply, void * data)
);
return;
}
{
std::lock_guard<std::mutex> msg_lock(client_data->replies_mutex);
// Take ownership of the reply.
client_data->replies.emplace_back(std::make_unique<ZenohReply>(reply));
*reply = z_reply_null();
}
{
std::lock_guard<std::mutex> internal_lock(client_data->internal_mutex);
if (client_data->condition != nullptr) {
client_data->condition->notify_one();
}
}

client_data->add_new_reply(std::make_unique<ZenohReply>(reply));
// Since we took ownership of the reply, null it out here
*reply = z_reply_null();
}
Loading

0 comments on commit 7770f2f

Please sign in to comment.