Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zenoh 1.0.0 Porting #276

Open
wants to merge 133 commits into
base: rolling
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
133 commits
Select commit Hold shift + click to select a range
e2fabb6
Update style.yaml
Mallets Jul 23, 2024
6cd6dbe
Update check_logging.yaml
Mallets Jul 23, 2024
dcefb68
Update build.yaml
Mallets Jul 23, 2024
14fe58c
chore: configure the compiliation
YuanYuYuan Jun 20, 2024
301a8ce
chore: complete the 1st version
YuanYuYuan Jun 25, 2024
8ef92b5
fix: memory leak
YuanYuYuan Jul 18, 2024
06b6fa4
wip
YuanYuYuan Jul 25, 2024
71e7c5b
fix: z_error_t -> z_result_t
YuanYuYuan Jul 26, 2024
a717270
Fix `scouting/*/autoconnect/*` per eclipse-zenoh/zenoh@b31a410 (#3)
fuzzypixelz Jul 24, 2024
cfa63d9
chore: checkout the local zenoh-c
YuanYuYuan Jul 30, 2024
12dc820
chore: polish z_open
YuanYuYuan Jul 31, 2024
d331b74
fix: segfault
YuanYuYuan Jul 31, 2024
1db4bef
feat: `z_bytes_serialize_from_slice` without copy
YuanYuYuan Jul 31, 2024
d12f189
chore: switch back to https://github.com/eclipse-zenoh/zenoh-c
YuanYuYuan Jul 31, 2024
909329b
Initialize `query_` member of `ZenohQuery`
fuzzypixelz Jul 26, 2024
511c6f6
refactor: use `z_owned_slice_t` instead
YuanYuYuan Jul 31, 2024
044d25b
chore: adapt the latest change of zenoh-c dev/1.0.0
YuanYuYuan Aug 2, 2024
d79cf5e
chore: use `strncmp` to avoid copying
YuanYuYuan Aug 7, 2024
6637400
refactor: use `z_view_keyexpr_t` to avoid copying
YuanYuYuan Aug 8, 2024
83334c8
feat: implment the SHM feature
YuanYuYuan Aug 8, 2024
bf0da8a
chore: adpat the new changes from zenoh-c and fix the bug in liveliness
YuanYuYuan Aug 13, 2024
dfc95ad
fix: segmentation fault due to the unallocated query memory
YuanYuYuan Aug 18, 2024
cd071e5
fix: workaround the ZID parsing issue
YuanYuYuan Aug 26, 2024
4f842fd
fix Zenoh Config read\check
yellowhatter Aug 22, 2024
af2e344
adopt to recent zenoh-c API changes
yellowhatter Aug 23, 2024
0225a59
chore: sync up with the rolling branch
YuanYuYuan Aug 27, 2024
36140fd
chore: update to self-hosted runners
imstevenpmwork Aug 27, 2024
080eb15
fix: adapt the latest change of batching config
YuanYuYuan Aug 29, 2024
f60c2a2
Merge remote-tracking branch 'origin/rolling' into dev/1.0.0
YuanYuYuan Aug 29, 2024
83a26f2
build: deprecate the zenohc_debug and incldue the zenohc dependency i…
YuanYuYuan Aug 29, 2024
31a76fd
Use main branch for upgrading to Zenoh 1.0
evshary Sep 2, 2024
7dba674
Merge pull request #14 from ZettaScaleLabs/update_branch
evshary Sep 2, 2024
b5cdd73
Increase the delay in scouting (#16)
evshary Sep 4, 2024
191181a
Merge remote-tracking branch 'origin/rolling' into dev/1.0.0
YuanYuYuan Sep 5, 2024
aed64e8
fixup! Merge remote-tracking branch 'origin/rolling' into dev/1.0.0
YuanYuYuan Sep 5, 2024
dd0e541
chore: ament_uncrustify and ament_cpplint
YuanYuYuan Sep 5, 2024
7dcc7d6
ci: fix the argument order in the style CI
YuanYuYuan Sep 5, 2024
7bbfdf5
chore: remove .clang_format
YuanYuYuan Sep 5, 2024
ed51a81
chore: clean up
YuanYuYuan Sep 5, 2024
975ea82
refactor: use `z_id_to_string`
YuanYuYuan Sep 5, 2024
17adcfa
chore: tidy up
YuanYuYuan Sep 5, 2024
e62c9bd
build: use the latest zenoh-c commit
YuanYuYuan Sep 5, 2024
458414d
build: enable the unstable feature flag
YuanYuYuan Sep 5, 2024
bf49775
chore: make iron ament_uncrustify happy
YuanYuYuan Sep 5, 2024
1630e11
test: check uncrustify version
YuanYuYuan Sep 5, 2024
c0a7533
fixup! test: check uncrustify version
YuanYuYuan Sep 5, 2024
798b006
chore: make iron ament_uncrustify happy
YuanYuYuan Sep 5, 2024
7d7a296
test
YuanYuYuan Sep 5, 2024
30aae80
Revert "test"
YuanYuYuan Sep 5, 2024
4f369c5
build: bump up the zenoh-c commit
YuanYuYuan Sep 9, 2024
39f10d5
chore(style): align rmw_zenoh.cpp
YuanYuYuan Sep 10, 2024
c7c17a6
chore(styel): align detail/rmw_data_types.cpp
YuanYuYuan Sep 10, 2024
1eef11d
chore(style): align rmw_init.cpp
YuanYuYuan Sep 10, 2024
e57ce89
chore(style): align detail/attachment_helpers.cpp
YuanYuYuan Sep 10, 2024
86940a8
build: update zenoh-c version
YuanYuYuan Sep 10, 2024
3aba418
chore(style): align the remaining files
YuanYuYuan Sep 10, 2024
05ff443
chore(style): consistently use `Z_OK` in the if condition
YuanYuYuan Sep 10, 2024
dd1e691
chore(style): ament_uncrustify
YuanYuYuan Sep 10, 2024
5bf7cc0
chore(style): ament_cpplint
YuanYuYuan Sep 10, 2024
206e34e
fix: set the max size of initial query queue to `SIZE_MAX - 1`
YuanYuYuan Sep 11, 2024
e9d0513
fix: iterator memory leak
YuanYuYuan Sep 16, 2024
520a3a2
feat: update to zenoh-c 1.0.0.8 changes
imstevenpmwork Sep 13, 2024
fce8a62
chore(style): address `ament_cpplint` and `ament_uncrustiy`
YuanYuYuan Sep 16, 2024
09c5cbc
fix: initiate zenoh logger
YuanYuYuan Sep 17, 2024
54dd96a
chore: apply the suggestions
YuanYuYuan Sep 19, 2024
becea93
chore: add the comments for the zenoh logger
YuanYuYuan Sep 19, 2024
9df5b82
Merge branch 'rolling' into dev/1.0.0
YuanYuYuan Sep 20, 2024
1f135c2
fix: store and destroy the subscriber properly
YuanYuYuan Sep 20, 2024
5177594
chore: improve the null pointer check: NULL => nullptr
YuanYuYuan Sep 20, 2024
6a48919
Change liveliness tokens logs from warn to debug level (#22)
JEnoch Sep 23, 2024
6fd7e1c
fix: properly clone the pointer of query and reply to resolve the seg…
YuanYuYuan Sep 27, 2024
8d3fce9
chore: update to zenoh-c 1.0.0.9 (#23)
imstevenpmwork Sep 27, 2024
73d68af
Sleep for 100ms between router checks (#284)
Yadunund Sep 25, 2024
16916cb
Fix how total_count and total_count_change are calculated for matched…
Yadunund Sep 27, 2024
f089982
Thread-safe access to graph cache (#258)
Yadunund Sep 27, 2024
63530b7
fix: adapt the latest changes in rolling
YuanYuYuan Sep 30, 2024
afd7f63
Merge remote-tracking branch 'origin/rolling' into dev/1.0.0
YuanYuYuan Sep 30, 2024
a67b194
Merge branch 'rolling' into wip/merge-rolling
YuanYuYuan Oct 1, 2024
c8724d2
fix: addres the conflict
YuanYuYuan Oct 2, 2024
979896a
Merge branch 'rolling' into wip/merge-rolling
YuanYuYuan Oct 2, 2024
d022967
chore: adapt the latest change
YuanYuYuan Oct 2, 2024
1cfc9f6
refactor(api): align with latest serialization changes
YuanYuYuan Oct 2, 2024
bb64fde
chore(deps): bump up zenoh-c to 1.0.0.10
YuanYuYuan Oct 2, 2024
4aef9e7
chore(api): align with latest serialization changes
imstevenpmwork Oct 2, 2024
c386a3d
fix: correct the sub_ke and selector_ke in the querying_subscriber
YuanYuYuan Oct 3, 2024
37134bb
Merge branch 'rolling' into wip/merge-rolling
YuanYuYuan Oct 7, 2024
93e733f
fix: thread-safe publisher
YuanYuYuan Oct 7, 2024
eeb9985
Enable history option for liveliness subscriber. (#27)
evshary Oct 8, 2024
bec4a52
refactor!: adopt the TLS config renaming
YuanYuYuan Oct 8, 2024
f32daa9
refactor: allow Zenoh session to close without dropping
YuanYuYuan Oct 8, 2024
14d17b0
fix: address the failure in rclcpp/test_wait_for_message of delcaring…
YuanYuYuan Oct 8, 2024
60a2e8d
Merge branch 'wip/merge-rolling' into dev/1.0.0
YuanYuYuan Oct 8, 2024
589971e
test: close but not drop the session
YuanYuYuan Oct 8, 2024
9b65df5
Merge branch 'rolling' into wip/merge-rolling
YuanYuYuan Oct 9, 2024
c7d3a5b
fix: correct the merge
YuanYuYuan Oct 9, 2024
6ba342b
chore: Explicit false in adminspace config
imstevenpmwork Oct 9, 2024
143fa88
fix: enable admin space in rmw router and ros nodes
gabrik Oct 10, 2024
52070b2
Merge pull request #29 from ZettaScaleLabs/fix/enable-admin-space
imstevenpmwork Oct 10, 2024
5ae0e3a
Bump zenoh-c version. (#30)
evshary Oct 11, 2024
257de16
Use the latest zenoh-c which fix some nav2 issues. (#31)
evshary Oct 14, 2024
736c34e
Update config files according to Zenoh 1.0.0 DEFAULT_CONFIG.json5 (#33)
JEnoch Oct 15, 2024
030959f
Merge branch 'rolling' into wip/merge-rolling
YuanYuYuan Oct 24, 2024
ec8aa99
chore(zenoh_c_vendor): bumb up zenoh-c version
YuanYuYuan Oct 24, 2024
ab5e0be
fix: adjust the merge
YuanYuYuan Oct 24, 2024
24f479b
refactor: remove the free_attachment
YuanYuYuan Oct 24, 2024
e6b7ecb
Fix unset request header writer GUID (#36)
fuzzypixelz Oct 24, 2024
bc6f6d6
Merge branch 'dev/1.0.0' into wip/merge-rolling
YuanYuYuan Oct 24, 2024
8024b7a
fix: keyexpr is missing in the service
YuanYuYuan Oct 24, 2024
3a5cc80
style: ament_uncrustify
YuanYuYuan Oct 24, 2024
5c3d385
style: apply iron ament_uncrustify
YuanYuYuan Oct 24, 2024
bb8e0ec
Avoid touching Zenoh Session while exiting. (#37)
evshary Oct 31, 2024
787376c
chore(deps): bump up zenoh-c to 1.0.1
YuanYuYuan Nov 4, 2024
ec7776e
Merge branch 'rolling' into wip/merge-rolling
YuanYuYuan Nov 7, 2024
2cfa97c
fix: adjust the merge
YuanYuYuan Nov 7, 2024
28d917e
style: ament_cpplint
YuanYuYuan Nov 7, 2024
f400ab5
SHM support (#39)
yellowhatter Nov 13, 2024
14683cd
fix: use TRUE value to configure the feature flag
YuanYuYuan Nov 14, 2024
d44002e
fix: fix the SHM enabled check
YuanYuYuan Nov 14, 2024
8557523
fix: inlcude shm_context.cpp in the build
YuanYuYuan Nov 14, 2024
cb87b1e
revert: SHM support
YuanYuYuan Nov 15, 2024
46ec449
Merge branch 'rolling' into wip/merge-rolling
YuanYuYuan Nov 15, 2024
2743344
fix: adapt the merge and bump up zenoh-c version
YuanYuYuan Nov 18, 2024
09798c3
chore: remove .gitignore
YuanYuYuan Nov 19, 2024
b6df66c
fix: correct typo `attachement` to `attachment`
YuanYuYuan Nov 19, 2024
59203df
Merge branch 'rolling' into wip/merge-rolling
YuanYuYuan Nov 19, 2024
9f78ca4
style: ament_cpplint and ament_uncrustify
YuanYuYuan Nov 19, 2024
7450bb2
refactor: remove the warning of subscriber reliability QoS
YuanYuYuan Nov 20, 2024
5ade015
fix: add back atexit handler
YuanYuYuan Nov 20, 2024
1e705c1
merge: integrate rolling into dev/1.0.0
YuanYuYuan Nov 21, 2024
2e3ad43
Revert "Don't shutdown contained entities. (#313)"
YuanYuYuan Nov 21, 2024
c41ee5a
Reapply "Don't shutdown contained entities. (#313)"
YuanYuYuan Nov 22, 2024
aa19545
chore: include the string_array.h
YuanYuYuan Nov 22, 2024
b485a93
fix: include "rcutils/types/string_array.h"
YuanYuYuan Nov 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
YuanYuYuan marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.vscode
9 changes: 9 additions & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()

set(RMW_ZENOH_BUILD_WITH_SHARED_MEMORY ON CACHE BOOL "Compile Zenoh RMW with Shared Memory support")

# find dependencies
find_package(ament_cmake REQUIRED)

Expand Down Expand Up @@ -80,6 +82,13 @@ target_compile_definitions(rmw_zenoh_cpp
RMW_VERSION_PATCH=${rmw_VERSION_PATCH}
)

if(${RMW_ZENOH_BUILD_WITH_SHARED_MEMORY})
target_compile_definitions(rmw_zenoh_cpp
PRIVATE
RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
)
endif()

ament_export_targets(export_rmw_zenoh_cpp)

register_rmw_implementation(
Expand Down
2 changes: 0 additions & 2 deletions rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,6 @@
/// A probing procedure for shared memory is performed upon session opening. To enable zenoh to operate
/// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
/// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
///
/// ROS setting: disabled by default until fully tested
enabled: false,
},
auth: {
Expand Down
2 changes: 0 additions & 2 deletions rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,6 @@
/// A probing procedure for shared memory is performed upon session opening. To enable zenoh to operate
/// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
/// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
///
/// ROS setting: disabled by default until fully tested
enabled: false,
},
auth: {
Expand Down
88 changes: 51 additions & 37 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,24 @@ rmw_context_impl_s::Data::Data(
std::size_t domain_id,
const std::string & enclave,
z_owned_session_t session,
std::optional<z_owned_shm_provider_t> shm_provider,
const std::string & liveliness_str,
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache)
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, std::optional<rmw_zenoh_cpp::ShmContext> shm
#endif
)
: enclave_(std::move(enclave)),
domain_id_(std::move(domain_id)),
session_(std::move(session)),
shm_provider_(std::move(shm_provider)),
liveliness_str_(std::move(liveliness_str)),
graph_cache_(std::move(graph_cache)),
is_shutdown_(false),
next_entity_id_(0),
is_initialized_(false),
nodes_({})
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, shm_(shm)
#endif
{
graph_guard_condition_ = std::make_unique<rmw_guard_condition_t>();
graph_guard_condition_->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier;
Expand Down Expand Up @@ -178,14 +183,18 @@ rmw_ret_t rmw_context_impl_s::Data::shutdown()
}

z_undeclare_subscriber(z_move(graph_subscriber_));
if (shm_provider_.has_value()) {
z_drop(z_move(shm_provider_.value()));
}

// Don't touch Zenoh Session if the ROS process is exiting,
// it will cause panic.
if (!is_exiting) {
z_close(z_loan_mut(session_), NULL);
}

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// drop SHM subsystem if used
shm_ = std::nullopt;
#endif

is_shutdown_ = true;
return RMW_RET_OK;
}
Expand Down Expand Up @@ -214,14 +223,6 @@ rmw_context_impl_s::rmw_context_impl_s(
throw std::runtime_error("Error configuring Zenoh session.");
}

// Check if shm is enabled.
z_owned_string_t shm_enabled;
zc_config_get_from_str(z_loan(config), Z_CONFIG_SHARED_MEMORY_KEY, &shm_enabled);
auto always_free_shm_enabled = rcpputils::make_scope_exit(
[&shm_enabled]() {
z_drop(z_move(shm_enabled));
});

// Initialize the zenoh session.
z_owned_session_t session;
if (z_open(&session, z_move(config), NULL) != Z_OK) {
Expand Down Expand Up @@ -309,40 +310,51 @@ rmw_context_impl_s::rmw_context_impl_s(
}
z_drop(z_move(handler));

// Initialize the shm manager if shared_memory is enabled in the config.
std::optional<z_owned_shm_provider_t> shm_provider = std::nullopt;
if (strncmp(z_string_data(z_loan(shm_enabled)), "true", z_string_len(z_loan(shm_enabled))) == 0) {
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Initialize the shm subsystem if shared_memory is enabled in the config
std::optional<rmw_zenoh_cpp::ShmContext> shm;
if (rmw_zenoh_cpp::zenoh_shm_enabled()) {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled");

// TODO(yuyuan): determine the default alignment of SHM
z_alloc_alignment_t alignment = {5};
z_owned_memory_layout_t layout;
z_memory_layout_new(&layout, SHM_BUFFER_SIZE_MB * 1024 * 1024, alignment);
rmw_zenoh_cpp::ShmContext shm_context;

// Read msg size treshold from config
shm_context.msgsize_threshold = rmw_zenoh_cpp::zenoh_shm_message_size_threshold();

z_owned_shm_provider_t provider;
if (z_posix_shm_provider_new(&provider, z_loan(layout)) != Z_OK) {
RMW_ZENOH_LOG_ERROR_NAMED("rmw_zenoh_cpp", "Unable to create a SHM provider.");
throw std::runtime_error("Unable to create shm manager.");
// Create Layout for provider's memory
// Provider's alignment will be 1 byte as we are going to make only 1-byte aligned allocations
// TODO(yellowhatter): use zenoh_shm_message_size_threshold as base for alignment
z_alloc_alignment_t alignment = {0};
z_owned_memory_layout_t layout;
if (z_memory_layout_new(&layout, rmw_zenoh_cpp::zenoh_shm_alloc_size(), alignment) != Z_OK) {
throw std::runtime_error("Unable to create a Layout for SHM provider.");
}
shm_provider = provider;
// Create SHM provider
const auto provider_creation_result =
z_posix_shm_provider_new(&shm_context.shm_provider, z_loan(layout));
z_drop(z_move(layout));
if (provider_creation_result != Z_OK) {
throw std::runtime_error("Unable to create an SHM provider.");
}
// Upon successful provider creation, store it in the context
shm = std::make_optional(std::move(shm_context));
} else {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is disabled");
}
auto free_shm_provider = rcpputils::make_scope_exit(
[&shm_provider]() {
if (shm_provider.has_value()) {
z_drop(z_move(shm_provider.value()));
}
});
#endif

close_session.cancel();
free_shm_provider.cancel();

data_ = std::make_shared<Data>(
domain_id,
std::move(enclave),
std::move(session),
std::move(shm_provider),
std::move(liveliness_str),
std::move(graph_cache));
std::move(graph_cache)
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, std::move(shm)
#endif
);

ret = data_->subscribe_to_ros_graph();
if (ret != RMW_RET_OK) {
Expand Down Expand Up @@ -376,11 +388,13 @@ const z_loaned_session_t * rmw_context_impl_s::session() const
}

///=============================================================================
std::optional<z_owned_shm_provider_t> & rmw_context_impl_s::shm_provider()
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
std::optional<rmw_zenoh_cpp::ShmContext> & rmw_context_impl_s::shm()
{
std::lock_guard<std::recursive_mutex> lock(data_->mutex_);
return data_->shm_provider_;
return data_->shm_;
}
#endif

///=============================================================================
rmw_guard_condition_t * rmw_context_impl_s::graph_guard_condition()
Expand Down
21 changes: 14 additions & 7 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ class rmw_context_impl_s final
// create other Zenoh objects.
const z_loaned_session_t * session() const;

// Get a reference to the shm_provider.
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Get a reference to the shm subsystem.
// Note: This is not thread-safe.
// TODO(Yadunund): Remove this API and instead include a publish() API
// that handles the shm_provider once the context manages publishers.
std::optional<z_owned_shm_provider_t> & shm_provider();
std::optional<rmw_zenoh_cpp::ShmContext> & shm();
#endif

// Get the graph guard condition.
rmw_guard_condition_t * graph_guard_condition();
Expand Down Expand Up @@ -103,9 +105,12 @@ class rmw_context_impl_s final
std::size_t domain_id,
const std::string & enclave,
z_owned_session_t session,
std::optional<z_owned_shm_provider_t> shm_provider,
const std::string & liveliness_str,
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache);
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, std::optional<rmw_zenoh_cpp::ShmContext> shm
#endif
);

// Subscribe to the ROS graph.
rmw_ret_t subscribe_to_ros_graph();
Expand All @@ -126,9 +131,6 @@ class rmw_context_impl_s final
std::size_t domain_id_;
// An owned session.
z_owned_session_t session_;
// An optional SHM manager that is initialized of SHM is enabled in the
// zenoh session config.
std::optional<z_owned_shm_provider_t> shm_provider_;
// Liveliness keyexpr string to subscribe to for ROS graph changes.
std::string liveliness_str_;
// Graph cache.
Expand All @@ -148,6 +150,11 @@ class rmw_context_impl_s final
bool is_initialized_;
// Nodes created from this context.
std::unordered_map<const rmw_node_t *, std::shared_ptr<rmw_zenoh_cpp::NodeData>> nodes_;
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// An optional SHM context that is initialized if SHM is enabled in the
// zenoh session config.
std::optional<rmw_zenoh_cpp::ShmContext> shm_;
#endif
};

std::shared_ptr<Data> data_{nullptr};
Expand Down
63 changes: 45 additions & 18 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,11 @@ PublisherData::PublisherData(

///=============================================================================
rmw_ret_t PublisherData::publish(
const void * ros_message,
std::optional<z_owned_shm_provider_t> & shm_provider)
const void * ros_message
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, std::optional<ShmContext> & shm
#endif
)
{
std::lock_guard<std::mutex> lock(mutex_);
if (is_shutdown_) {
Expand All @@ -233,37 +236,51 @@ rmw_ret_t PublisherData::publish(
}

// Serialize data.
size_t max_data_length = type_support_->get_estimated_serialized_size(
const size_t max_data_length = type_support_->get_estimated_serialized_size(
ros_message,
type_support_impl_);

// To store serialized message byte array.
char * msg_bytes = nullptr;

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
std::optional<z_owned_shm_mut_t> shmbuf = std::nullopt;
auto always_free_shmbuf = rcpputils::make_scope_exit(
[&shmbuf]() {
if (shmbuf.has_value()) {
z_drop(z_move(shmbuf.value()));
}
});
#endif

rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator;

auto always_free_msg_bytes = rcpputils::make_scope_exit(
[&msg_bytes, allocator, &shmbuf]() {
if (msg_bytes && !shmbuf.has_value()) {
[&msg_bytes, allocator
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, &shmbuf
#endif
]() {
if (msg_bytes
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
&& !shmbuf.has_value()
#endif
)
{
allocator->deallocate(msg_bytes, allocator->state);
}
});

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Get memory from SHM buffer if available.
if (shm_provider.has_value()) {
if (shm.has_value() && max_data_length >= shm.value().msgsize_threshold) {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled.");

auto provider = shm_provider.value();
auto & provider = shm.value().shm_provider;

// TODO(yellowhatter): SHM, use alignment based on msgsize_threshold
z_alloc_alignment_t alignment = {0};
z_buf_layout_alloc_result_t alloc;
// TODO(yuyuan): SHM, configure this
z_alloc_alignment_t alignment = {5};
z_shm_provider_alloc_gc_defrag_blocking(&alloc, z_loan(provider), SHM_BUF_OK_SIZE, alignment);

if (alloc.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) {
Expand All @@ -275,11 +292,14 @@ rmw_ret_t PublisherData::publish(
return RMW_RET_ERROR;
}
} else {
// Get memory from the allocator.
msg_bytes = static_cast<char *>(allocator->allocate(max_data_length, allocator->state));
RMW_CHECK_FOR_NULL_WITH_MSG(
msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC);
}
#endif
// Get memory from the allocator.
msg_bytes = static_cast<char *>(allocator->allocate(max_data_length, allocator->state));
RMW_CHECK_FOR_NULL_WITH_MSG(
msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC);
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
}
#endif

// Object that manages the raw buffer
eprosima::fastcdr::FastBuffer fastbuffer(msg_bytes, max_data_length);
Expand Down Expand Up @@ -309,11 +329,15 @@ rmw_ret_t PublisherData::publish(
options.attachment = z_move(attachment);

z_owned_bytes_t payload;
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
if (shmbuf.has_value()) {
z_bytes_from_shm_mut(&payload, z_move(shmbuf.value()));
} else {
z_bytes_copy_from_buf(&payload, reinterpret_cast<const uint8_t *>(msg_bytes), data_length);
}
#endif
z_bytes_copy_from_buf(&payload, reinterpret_cast<const uint8_t *>(msg_bytes), data_length);
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
}
#endif

z_result_t res = z_publisher_put(z_loan(pub_), z_move(payload), &options);
if (res != Z_OK) {
Expand All @@ -332,8 +356,11 @@ rmw_ret_t PublisherData::publish(

///=============================================================================
rmw_ret_t PublisherData::publish_serialized_message(
const rmw_serialized_message_t * serialized_message,
std::optional<z_owned_shm_provider_t> & /*shm_provider*/)
const rmw_serialized_message_t * serialized_message
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, std::optional<ShmContext> & /*shm_provider*/
#endif
)
{
eprosima::fastcdr::FastBuffer buffer(
reinterpret_cast<char *>(serialized_message->buffer), serialized_message->buffer_length);
Expand Down
Loading