Skip to content

Commit

Permalink
Attach sequence number, publisher GID, and source timestamp to public…
Browse files Browse the repository at this point in the history
…ations.

That way, the subscriptions can pull them out of the
attachment and pass it to the upper layers.

Signed-off-by: Chris Lalancette <[email protected]>
  • Loading branch information
clalancette committed Mar 4, 2024
1 parent 191a56b commit cff4ea0
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 164 deletions.
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ find_package(zenoh_c_vendor REQUIRED)
find_package(zenohc REQUIRED)

add_library(rmw_zenoh_cpp SHARED
src/detail/attachment_helpers.cpp
src/detail/identifier.cpp
src/detail/graph_cache.cpp
src/detail/guard_condition.cpp
Expand Down
93 changes: 93 additions & 0 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2024 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <zenoh.h>

#include <cstdlib>
#include <cstring>
#include <string>

#include "rmw/types.h"

#include "attachment_helpers.hpp"

bool get_gid_from_attachment(
const z_attachment_t * const attachment, uint8_t gid[RMW_GID_STORAGE_SIZE])
{
if (!z_check(*attachment)) {
return false;
}

z_bytes_t index = z_attachment_get(*attachment, z_bytes_new("source_gid"));
if (!z_check(index)) {
return false;
}

if (index.len != RMW_GID_STORAGE_SIZE) {
return false;
}

memcpy(gid, index.start, index.len);

return true;
}

int64_t get_int64_from_attachment(
const z_attachment_t * const attachment, const std::string & name)
{
if (!z_check(*attachment)) {
// A valid request must have had an attachment
return -1;
}

z_bytes_t index = z_attachment_get(*attachment, z_bytes_new(name.c_str()));
if (!z_check(index)) {
return -1;
}

if (index.len < 1) {
return -1;
}

if (index.len > 19) {
// The number was larger than we expected
return -1;
}

// The largest possible int64_t number is INT64_MAX, i.e. 9223372036854775807.
// That is 19 characters long, plus one for the trailing \0, means we need 20 bytes.
char int64_str[20];

memcpy(int64_str, index.start, index.len);
int64_str[index.len] = '\0';

errno = 0;
char * endptr;
int64_t num = strtol(int64_str, &endptr, 10);
if (num == 0) {
// This is an error regardless; the client should never send this
return -1;
} else if (endptr == int64_str) {
// No values were converted, this is an error
return -1;
} else if (*endptr != '\0') {
// There was junk after the number
return -1;
} else if (errno != 0) {
// Some other error occurred, which may include overflow or underflow
return -1;
}

return num;
}
30 changes: 30 additions & 0 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2024 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef DETAIL__ATTACHMENT_HELPERS_HPP_
#define DETAIL__ATTACHMENT_HELPERS_HPP_

#include <zenoh.h>

#include <string>

#include "rmw/types.h"

bool get_gid_from_attachment(
const z_attachment_t * const attachment, uint8_t gid[RMW_GID_STORAGE_SIZE]);

int64_t get_int64_from_attachment(
const z_attachment_t * const attachment, const std::string & name);

#endif // DETAIL__ATTACHMENT_HELPERS_HPP_
54 changes: 48 additions & 6 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,43 @@

#include <zenoh.h>

#include <condition_variable>
#include <cstring>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <utility>

#include "rcpputils/scope_exit.hpp"
#include "rcutils/logging_macros.h"

#include "attachment_helpers.hpp"
#include "rmw_data_types.hpp"

///==============================================================================
saved_msg_data::saved_msg_data(zc_owned_payload_t p, uint64_t recv_ts, const uint8_t pub_gid[16])
: payload(p), recv_timestamp(recv_ts)
saved_msg_data::saved_msg_data(
zc_owned_payload_t p,
uint64_t recv_ts,
const uint8_t pub_gid[RMW_GID_STORAGE_SIZE],
int64_t seqnum,
int64_t source_ts)
: payload(p), recv_timestamp(recv_ts), sequence_number(seqnum), source_timestamp(source_ts)
{
memcpy(publisher_gid, pub_gid, 16);
memcpy(publisher_gid, pub_gid, RMW_GID_STORAGE_SIZE);
}

saved_msg_data::~saved_msg_data()
{
z_drop(z_move(payload));
}

size_t rmw_publisher_data_t::get_next_sequence_number()
{
std::lock_guard<std::mutex> lock(sequence_number_mutex_);
return sequence_number_++;
}

void rmw_subscription_data_t::attach_condition(std::condition_variable * condition_variable)
{
std::lock_guard<std::mutex> lock(condition_mutex_);
Expand Down Expand Up @@ -232,6 +247,7 @@ std::unique_ptr<ZenohReply> rmw_client_data_t::pop_next_reply()
}

//==============================================================================

void sub_data_handler(
const z_sample_t * sample,
void * data)
Expand All @@ -253,10 +269,36 @@ void sub_data_handler(
return;
}

uint8_t pub_gid[RMW_GID_STORAGE_SIZE];
if (!get_gid_from_attachment(&sample->attachment, pub_gid)) {
// We failed to get the GID from the attachment. While this isn't fatal,
// it is unusual and so we should report it.
memset(pub_gid, 0, RMW_GID_STORAGE_SIZE);
RCUTILS_LOG_ERROR_NAMED("rmw_zenoh_cpp", "Unable to obtain publisher GID from the attachment.");
}

int64_t sequence_number = get_int64_from_attachment(&sample->attachment, "sequence_number");
if (sequence_number < 0) {
// We failed to get the sequence number from the attachment. While this
// isn't fatal, it is unusual and so we should report it.
sequence_number = 0;
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp", "Unable to obtain sequence number from the attachment.");
}

int64_t source_timestamp = get_int64_from_attachment(&sample->attachment, "source_timestamp");
if (source_timestamp < 0) {
// We failed to get the source timestamp from the attachment. While this
// isn't fatal, it is unusual and so we should report it.
source_timestamp = 0;
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp", "Unable to obtain sequence number from the attachment.");
}

sub_data->add_new_message(
std::make_unique<saved_msg_data>(
zc_sample_payload_rcinc(sample),
sample->timestamp.time, sample->timestamp.id.id), z_loan(keystr));
sample->timestamp.time, pub_gid, sequence_number, source_timestamp), z_loan(keystr));
}

ZenohQuery::ZenohQuery(const z_query_t * query)
Expand Down Expand Up @@ -318,8 +360,8 @@ std::optional<z_sample_t> ZenohReply::get_sample() const

size_t rmw_client_data_t::get_next_sequence_number()
{
std::lock_guard<std::mutex> lock(sequence_number_mutex);
return sequence_number++;
std::lock_guard<std::mutex> lock(sequence_number_mutex_);
return sequence_number_++;
}

//==============================================================================
Expand Down
28 changes: 21 additions & 7 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ struct rmw_node_data_t
};

///==============================================================================
struct rmw_publisher_data_t
class rmw_publisher_data_t final
{
public:
// An owned publisher.
z_owned_publisher_t pub;

Expand All @@ -93,7 +94,13 @@ struct rmw_publisher_data_t
// Context for memory allocation for messages.
rmw_context_t * context;

uint8_t pub_guid[RMW_GID_STORAGE_SIZE];
uint8_t pub_gid[RMW_GID_STORAGE_SIZE];

size_t get_next_sequence_number();

private:
std::mutex sequence_number_mutex_;
size_t sequence_number_{1};
};

///==============================================================================
Expand All @@ -111,13 +118,20 @@ void sub_data_handler(const z_sample_t * sample, void * sub_data);

struct saved_msg_data
{
explicit saved_msg_data(zc_owned_payload_t p, uint64_t recv_ts, const uint8_t pub_gid[16]);
explicit saved_msg_data(
zc_owned_payload_t p,
uint64_t recv_ts,
const uint8_t pub_gid[RMW_GID_STORAGE_SIZE],
int64_t seqnum,
int64_t source_ts);

~saved_msg_data();

zc_owned_payload_t payload;
uint64_t recv_timestamp;
uint8_t publisher_gid[16];
uint8_t publisher_gid[RMW_GID_STORAGE_SIZE];
int64_t sequence_number;
int64_t source_timestamp;
};

///==============================================================================
Expand Down Expand Up @@ -266,7 +280,7 @@ class rmw_client_data_t final

rmw_context_t * context;

uint8_t client_guid[RMW_GID_STORAGE_SIZE];
uint8_t client_gid[RMW_GID_STORAGE_SIZE];

size_t get_next_sequence_number();

Expand All @@ -283,8 +297,8 @@ class rmw_client_data_t final
private:
void notify();

size_t sequence_number{1};
std::mutex sequence_number_mutex;
size_t sequence_number_{1};
std::mutex sequence_number_mutex_;

std::condition_variable * condition_{nullptr};
std::mutex condition_mutex_;
Expand Down
Loading

0 comments on commit cff4ea0

Please sign in to comment.