Skip to content

Commit

Permalink
Merge branch 'rolling' into yadu/bump_zenoh_with_tokio
Browse files Browse the repository at this point in the history
  • Loading branch information
Yadunund committed Apr 18, 2024
2 parents 59ffcdd + 12ebc2e commit 63c7447
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 73 deletions.
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ find_package(zenohc REQUIRED)

add_library(rmw_zenoh_cpp SHARED
src/detail/attachment_helpers.cpp
src/detail/cdr.cpp
src/detail/event.cpp
src/detail/identifier.cpp
src/detail/graph_cache.cpp
Expand Down
42 changes: 42 additions & 0 deletions rmw_zenoh_cpp/src/detail/cdr.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2024 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "fastcdr/Cdr.h"
#include "fastcdr/FastBuffer.h"
#include "fastcdr/config.h"

#include "cdr.hpp"

rmw_zenoh_cpp::Cdr::Cdr(eprosima::fastcdr::FastBuffer & fastbuffer)
#if FASTCDR_VERSION_MAJOR == 1
: cdr_(fastbuffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR)
#else
: cdr_(fastbuffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::CdrVersion::DDS_CDR)
#endif
{
}

size_t rmw_zenoh_cpp::Cdr::get_serialized_data_length() const
{
#if FASTCDR_VERSION_MAJOR == 1
return cdr_.getSerializedDataLength();
#else
return cdr_.get_serialized_data_length();
#endif
}

eprosima::fastcdr::Cdr & rmw_zenoh_cpp::Cdr::get_cdr()
{
return cdr_;
}
39 changes: 39 additions & 0 deletions rmw_zenoh_cpp/src/detail/cdr.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2024 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef DETAIL__CDR_HPP_
#define DETAIL__CDR_HPP_

#include "fastcdr/Cdr.h"
#include "fastcdr/FastBuffer.h"

// A wrapper class to paper over the differences between Fast-CDR v1 and Fast-CDR v2
namespace rmw_zenoh_cpp
{
class Cdr final
{
public:
explicit Cdr(eprosima::fastcdr::FastBuffer & fastbuffer);

eprosima::fastcdr::Cdr & get_cdr();

size_t get_serialized_data_length() const;

private:
eprosima::fastcdr::Cdr cdr_;
};

} // namespace rmw_zenoh_cpp

#endif // DETAIL__CDR_HPP_
4 changes: 2 additions & 2 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,8 @@ std::shared_ptr<Entity> Entity::make(
return std::make_shared<Entity>(
Entity{
zid_to_str(zid),
std::move(nid),
std::move(id),
nid,
id,
std::move(type),
std::move(node_info),
std::move(topic_info)});
Expand Down
60 changes: 51 additions & 9 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <memory>
#include <mutex>
#include <optional>
#include <sstream>
#include <string>
#include <utility>

Expand Down Expand Up @@ -208,31 +209,72 @@ void rmw_service_data_t::add_new_query(std::unique_ptr<ZenohQuery> query)
notify();
}

static size_t hash_gid(const rmw_request_id_t & request_id)
{
std::stringstream hash_str;
hash_str << std::hex;
size_t i = 0;
for (; i < (RMW_GID_STORAGE_SIZE - 1); i++) {
hash_str << static_cast<int>(request_id.writer_guid[i]);
}
return std::hash<std::string>{}(hash_str.str());
}

///=============================================================================
bool rmw_service_data_t::add_to_query_map(
int64_t sequence_number, std::unique_ptr<ZenohQuery> query)
const rmw_request_id_t & request_id, std::unique_ptr<ZenohQuery> query)
{
size_t hash = hash_gid(request_id);

std::lock_guard<std::mutex> lock(sequence_to_query_map_mutex_);
if (sequence_to_query_map_.find(sequence_number) != sequence_to_query_map_.end()) {
return false;

std::unordered_map<size_t, SequenceToQuery>::iterator it = sequence_to_query_map_.find(hash);

if (it == sequence_to_query_map_.end()) {
SequenceToQuery stq;

sequence_to_query_map_.insert(std::make_pair(hash, std::move(stq)));

it = sequence_to_query_map_.find(hash);
} else {
// Client already in the map

if (it->second.find(request_id.sequence_number) != it->second.end()) {
return false;
}
}
sequence_to_query_map_.emplace(
std::pair(sequence_number, std::move(query)));

it->second.insert(std::make_pair(request_id.sequence_number, std::move(query)));

return true;
}

///=============================================================================
std::unique_ptr<ZenohQuery> rmw_service_data_t::take_from_query_map(int64_t sequence_number)
std::unique_ptr<ZenohQuery> rmw_service_data_t::take_from_query_map(
const rmw_request_id_t & request_id)
{
size_t hash = hash_gid(request_id);

std::lock_guard<std::mutex> lock(sequence_to_query_map_mutex_);
auto query_it = sequence_to_query_map_.find(sequence_number);
if (query_it == sequence_to_query_map_.end()) {

std::unordered_map<size_t, SequenceToQuery>::iterator it = sequence_to_query_map_.find(hash);

if (it == sequence_to_query_map_.end()) {
return nullptr;
}

SequenceToQuery::iterator query_it = it->second.find(request_id.sequence_number);

if (query_it == it->second.end()) {
return nullptr;
}

std::unique_ptr<ZenohQuery> query = std::move(query_it->second);
sequence_to_query_map_.erase(query_it);
it->second.erase(query_it);

if (sequence_to_query_map_[hash].size() == 0) {
sequence_to_query_map_.erase(hash);
}

return query;
}
Expand Down
10 changes: 5 additions & 5 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include <optional>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <variant>
#include <vector>
Expand Down Expand Up @@ -252,9 +251,9 @@ class rmw_service_data_t final

void add_new_query(std::unique_ptr<ZenohQuery> query);

bool add_to_query_map(int64_t sequence_number, std::unique_ptr<ZenohQuery> query);
bool add_to_query_map(const rmw_request_id_t & request_id, std::unique_ptr<ZenohQuery> query);

std::unique_ptr<ZenohQuery> take_from_query_map(int64_t sequence_number);
std::unique_ptr<ZenohQuery> take_from_query_map(const rmw_request_id_t & request_id);

DataCallbackManager data_callback_mgr;

Expand All @@ -265,8 +264,9 @@ class rmw_service_data_t final
std::deque<std::unique_ptr<ZenohQuery>> query_queue_;
mutable std::mutex query_queue_mutex_;

// Map to store the sequence_number -> query_id
std::unordered_map<int64_t, std::unique_ptr<ZenohQuery>> sequence_to_query_map_;
// Map to store the sequence_number (as given by the client) -> ZenohQuery
using SequenceToQuery = std::unordered_map<int64_t, std::unique_ptr<ZenohQuery>>;
std::unordered_map<size_t, SequenceToQuery> sequence_to_query_map_;
std::mutex sequence_to_query_map_mutex_;

std::condition_variable * condition_{nullptr};
Expand Down
Loading

0 comments on commit 63c7447

Please sign in to comment.